Source code for cocotb.drivers.amba

# Copyright (c) 2014 Potential Ventures Ltd
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of Potential Ventures Ltd,
#       SolarFlare Communications Inc nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Drivers for Advanced Microcontroller Bus Architecture."""

import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Lock
from cocotb.drivers import BusDriver
from cocotb.result import ReturnValue
from cocotb.binary import BinaryValue

import array


class AXIProtocolError(Exception):
    pass


[docs]class AXI4LiteMaster(BusDriver): """AXI4-Lite Master. TODO: Kill all pending transactions if reset is asserted. """ _signals = ["AWVALID", "AWADDR", "AWREADY", # Write address channel "WVALID", "WREADY", "WDATA", "WSTRB", # Write data channel "BVALID", "BREADY", "BRESP", # Write response channel "ARVALID", "ARADDR", "ARREADY", # Read address channel "RVALID", "RREADY", "RRESP", "RDATA"] # Read data channel def __init__(self, entity, name, clock): BusDriver.__init__(self, entity, name, clock) # Drive some sensible defaults (setimmediatevalue to avoid x asserts) self.bus.AWVALID.setimmediatevalue(0) self.bus.WVALID.setimmediatevalue(0) self.bus.ARVALID.setimmediatevalue(0) self.bus.BREADY.setimmediatevalue(1) self.bus.RREADY.setimmediatevalue(1) # Mutex for each channel that we master to prevent contention self.write_address_busy = Lock("%s_wabusy" % name) self.read_address_busy = Lock("%s_rabusy" % name) self.write_data_busy = Lock("%s_wbusy" % name) @cocotb.coroutine def _send_write_address(self, address, delay=0): """ Send the write address, with optional delay (in clocks) """ yield self.write_address_busy.acquire() for cycle in range(delay): yield RisingEdge(self.clock) self.bus.AWADDR <= address self.bus.AWVALID <= 1 while True: yield ReadOnly() if self.bus.AWREADY.value: break yield RisingEdge(self.clock) yield RisingEdge(self.clock) self.bus.AWVALID <= 0 self.write_address_busy.release() @cocotb.coroutine def _send_write_data(self, data, delay=0, byte_enable=0xF): """Send the write address, with optional delay (in clocks).""" yield self.write_data_busy.acquire() for cycle in range(delay): yield RisingEdge(self.clock) self.bus.WDATA <= data self.bus.WVALID <= 1 self.bus.WSTRB <= byte_enable while True: yield ReadOnly() if self.bus.WREADY.value: break yield RisingEdge(self.clock) yield RisingEdge(self.clock) self.bus.WVALID <= 0 self.write_data_busy.release()
[docs] @cocotb.coroutine def write(self, address, value, byte_enable=0xf, address_latency=0, data_latency=0, sync=True): """Write a value to an address. Args: address (int): The address to write to. value (int): The data value to write. byte_enable (int, optional): Which bytes in value to actually write. Default is to write all bytes. address_latency (int, optional): Delay before setting the address (in clock cycles). Default is no delay. data_latency (int, optional): Delay before setting the data value (in clock cycles). Default is no delay. sync (bool, optional): Wait for rising edge on clock initially. Defaults to True. Returns: BinaryValue: The write response value. Raises: AXIProtocolError: If write response from AXI is not ``OKAY``. """ if sync: yield RisingEdge(self.clock) c_addr = cocotb.fork(self._send_write_address(address, delay=address_latency)) c_data = cocotb.fork(self._send_write_data(value, byte_enable=byte_enable, delay=data_latency)) if c_addr: yield c_addr.join() if c_data: yield c_data.join() # Wait for the response while True: yield ReadOnly() if self.bus.BVALID.value and self.bus.BREADY.value: result = self.bus.BRESP.value break yield RisingEdge(self.clock) yield RisingEdge(self.clock) if int(result): raise AXIProtocolError("Write to address 0x%08x failed with BRESP: %d" % (address, int(result))) raise ReturnValue(result)
[docs] @cocotb.coroutine def read(self, address, sync=True): """Read from an address. Args: address (int): The address to read from. sync (bool, optional): Wait for rising edge on clock initially. Defaults to True. Returns: BinaryValue: The read data value. Raises: AXIProtocolError: If read response from AXI is not ``OKAY``. """ if sync: yield RisingEdge(self.clock) self.bus.ARADDR <= address self.bus.ARVALID <= 1 while True: yield ReadOnly() if self.bus.ARREADY.value: break yield RisingEdge(self.clock) yield RisingEdge(self.clock) self.bus.ARVALID <= 0 while True: yield ReadOnly() if self.bus.RVALID.value and self.bus.RREADY.value: data = self.bus.RDATA.value result = self.bus.RRESP.value break yield RisingEdge(self.clock) if int(result): raise AXIProtocolError("Read address 0x%08x failed with RRESP: %d" % (address, int(result))) raise ReturnValue(data)
def __len__(self): return 2**len(self.bus.ARADDR)
[docs]class AXI4Slave(BusDriver): ''' AXI4 Slave Monitors an internal memory and handles read and write requests. ''' _signals = [ "ARREADY", "ARVALID", "ARADDR", # Read address channel "ARLEN", "ARSIZE", "ARBURST", "ARPROT", "RREADY", "RVALID", "RDATA", "RLAST", # Read response channel "AWREADY", "AWADDR", "AWVALID", # Write address channel "AWPROT", "AWSIZE", "AWBURST", "AWLEN", "WREADY", "WVALID", "WDATA", ] # Not currently supported by this driver _optional_signals = [ "WLAST", "WSTRB", "BVALID", "BREADY", "BRESP", "RRESP", "RCOUNT", "WCOUNT", "RACOUNT", "WACOUNT", "ARLOCK", "AWLOCK", "ARCACHE", "AWCACHE", "ARQOS", "AWQOS", "ARID", "AWID", "BID", "RID", "WID" ] def __init__(self, entity, name, clock, memory, callback=None, event=None, big_endian=False): BusDriver.__init__(self, entity, name, clock) self.clock = clock self.big_endian = big_endian self.bus.ARREADY.setimmediatevalue(1) self.bus.RVALID.setimmediatevalue(0) self.bus.RLAST.setimmediatevalue(0) self.bus.AWREADY.setimmediatevalue(1) self._memory = memory self.write_address_busy = Lock("%s_wabusy" % name) self.read_address_busy = Lock("%s_rabusy" % name) self.write_data_busy = Lock("%s_wbusy" % name) cocotb.fork(self._read_data()) cocotb.fork(self._write_data()) def _size_to_bytes_in_beat(self, AxSIZE): if AxSIZE < 7: return 2 ** AxSIZE return None @cocotb.coroutine def _write_data(self): clock_re = RisingEdge(self.clock) while True: while True: self.bus.WREADY <= 0 yield ReadOnly() if self.bus.AWVALID.value: self.bus.WREADY <= 1 break yield clock_re yield ReadOnly() _awaddr = int(self.bus.AWADDR) _awlen = int(self.bus.AWLEN) _awsize = int(self.bus.AWSIZE) _awburst = int(self.bus.AWBURST) _awprot = int(self.bus.AWPROT) burst_length = _awlen + 1 bytes_in_beat = self._size_to_bytes_in_beat(_awsize) if __debug__: self.log.debug( "AWADDR %d\n" % _awaddr + "AWLEN %d\n" % _awlen + "AWSIZE %d\n" % _awsize + "AWBURST %d\n" % _awburst + "BURST_LENGTH %d\n" % burst_length + "Bytes in beat %d\n" % bytes_in_beat) burst_count = burst_length yield clock_re while True: if self.bus.WVALID.value: word = self.bus.WDATA.value word.big_endian = self.big_endian _burst_diff = burst_length - burst_count _st = _awaddr + (_burst_diff * bytes_in_beat) # start _end = _awaddr + ((_burst_diff + 1) * bytes_in_beat) # end self._memory[_st:_end] = array.array('B', word.get_buff()) burst_count -= 1 if burst_count == 0: break yield clock_re @cocotb.coroutine def _read_data(self): clock_re = RisingEdge(self.clock) while True: while True: yield ReadOnly() if self.bus.ARVALID.value: break yield clock_re yield ReadOnly() _araddr = int(self.bus.ARADDR) _arlen = int(self.bus.ARLEN) _arsize = int(self.bus.ARSIZE) _arburst = int(self.bus.ARBURST) _arprot = int(self.bus.ARPROT) burst_length = _arlen + 1 bytes_in_beat = self._size_to_bytes_in_beat(_arsize) word = BinaryValue(n_bits=bytes_in_beat*8, bigEndian=self.big_endian) if __debug__: self.log.debug( "ARADDR %d\n" % _araddr + "ARLEN %d\n" % _arlen + "ARSIZE %d\n" % _arsize + "ARBURST %d\n" % _arburst + "BURST_LENGTH %d\n" % burst_length + "Bytes in beat %d\n" % bytes_in_beat) burst_count = burst_length yield clock_re while True: self.bus.RVALID <= 1 yield ReadOnly() if self.bus.RREADY.value: _burst_diff = burst_length - burst_count _st = _araddr + (_burst_diff * bytes_in_beat) _end = _araddr + ((_burst_diff + 1) * bytes_in_beat) word.buff = self._memory[_st:_end].tostring() self.bus.RDATA <= word if burst_count == 1: self.bus.RLAST <= 1 yield clock_re burst_count -= 1 self.bus.RLAST <= 0 if burst_count == 0: break