Source code for cocotb.monitors.xgmii

# Copyright (c) 2013 Potential Ventures Ltd
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of Potential Ventures Ltd nor the names of its
#       contributors may be used to endorse or promote products derived from this
#       software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Monitor for XGMII (10 Gigabit Media Independent Interface)."""

# By default cast to scapy packets, otherwise we pass the string of bytes
try:
    from scapy.all import Ether
    _have_scapy = True
except ImportError:
    _have_scapy = False

import struct
import zlib

import cocotb
from cocotb.utils import hexdump
from cocotb.monitors import Monitor
from cocotb.triggers import RisingEdge

_XGMII_IDLE      = "\x07"  # noqa
_XGMII_START     = "\xFB"  # noqa
_XGMII_TERMINATE = "\xFD"  # noqa

_PREAMBLE_SFD = "\x55\x55\x55\x55\x55\x55\xD5"


[docs]class XGMII(Monitor): """XGMII (10 Gigabit Media Independent Interface) Monitor. Assumes a single vector, either 4 or 8 bytes plus control bit for each byte. If interleaved is ``True`` then the control bits are adjacent to the bytes. """ def __init__(self, signal, clock, interleaved=True, callback=None, event=None): """Args: signal (SimHandle): The XGMII data bus. clock (SimHandle): The associated clock (assumed to be driven by another coroutine). interleaved (bool, optional): Whether control bits are interleaved with the data bytes or not. If interleaved the bus is byte0, byte0_control, byte1, byte1_control, ... Otherwise expect byte0, byte1, ..., byte0_control, byte1_control, ... """ self.log = signal._log self.clock = clock self.signal = signal self.bytes = len(self.signal) // 9 self.interleaved = interleaved Monitor.__init__(self, callback=callback, event=event) def _get_bytes(self): """Take a value and extract the individual bytes and control bits. Returns a tuple of lists. """ value = self.signal.value.integer bytes = [] ctrls = [] byte_shift = 8 ctrl_base = 8 * self.bytes ctrl_inc = 1 if self.interleaved: byte_shift += 1 ctrl_base = 8 ctrl_inc = 9 for i in range(self.bytes): bytes.append(chr((value >> (i * byte_shift)) & 0xff)) ctrls.append(bool(value & (1 << ctrl_base))) ctrl_base += ctrl_inc return ctrls, bytes def _add_payload(self, ctrl, bytes): """Take the payload and return true if more to come""" for index, byte in enumerate(bytes): if ctrl[index]: if byte != _XGMII_TERMINATE: self.log.error("Got control character in XGMII payload") self.log.info("data = :" + " ".join(["%02X" % ord(b) for b in bytes])) self.log.info("ctrl = :" + " ".join(["%s" % str(c) for c in ctrl])) self._pkt = "" return False self._pkt += byte return True @cocotb.coroutine def _monitor_recv(self): clk = RisingEdge(self.clock) self._pkt = "" while True: yield clk ctrl, bytes = self._get_bytes() if ctrl[0] and bytes[0] == _XGMII_START: ctrl, bytes = ctrl[1:], bytes[1:] while self._add_payload(ctrl, bytes): yield clk ctrl, bytes = self._get_bytes() elif self.bytes == 8 : if ctrl[4] and bytes[4] == _XGMII_START: ctrl, bytes = ctrl[5:], bytes[5:] while self._add_payload(ctrl, bytes): yield clk ctrl, bytes = self._get_bytes() if self._pkt: self.log.debug("Received:\n%s" % (hexdump(self._pkt))) if len(self._pkt) < 64 + 7: self.log.error("Received a runt frame!") if len(self._pkt) < 12: self.log.error("No data to extract") self._pkt = "" continue preamble_sfd = self._pkt[0:7] crc32 = self._pkt[-4:] payload = self._pkt[7:-4] if preamble_sfd != _PREAMBLE_SFD: self.log.error("Got a frame with unknown preamble/SFD") self.log.error(hexdump(preamble_sfd)) self._pkt = "" continue expected_crc = struct.pack("<I", (zlib.crc32(payload) & 0xFFFFFFFF)) if crc32 != expected_crc: self.log.error("Incorrect CRC on received packet") self.log.info("Expected: %s" % (hexdump(expected_crc))) self.log.info("Received: %s" % (hexdump(crc32))) # Use scapy to decode the packet if _have_scapy: p = Ether(payload) self.log.debug("Received decoded packet:\n%s" % p.show2()) else: p = payload self._recv(p) self._pkt = ""