toffee.bundle 源代码

__all__ = [
    "Bundle",
    "WriteMode",
    "DummySignal",
    "Signal",
    "Signals",
    "SignalList",
    "BundleList",
]

import random
import re
from enum import Enum
from queue import Queue
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from ._base import MObject
from .logger import *


[文档] class DummySignal: """ A dummy signal class that does nothing. It will return None when accessed, and do nothing when set. """ def __setattr__(self, __name, __value): pass def __getattribute__(self, __name): return None
[文档] class WriteMode(Enum): """ The write mode of a bundle. """ Imme = 0 Rise = 1 Fall = 2
class UnconnectedSignal: def __getattribute__(self, name: str): raise AttributeError(f"Can't access unconnected signal") def __setattr__(self, name: str, value): raise AttributeError(f"Can't access unconnected signal") def __str__(self): return "UnconnectedSignal()"
[文档] class Signal(UnconnectedSignal): ...
[文档] def Signals(num: int): if num == 1: return Signal() return [Signal() for _ in range(num)]
[文档] class SignalList: def __init__(self, format: str, limit: int, rule=None): assert limit > 0, "limit must be greater than 0" assert "#" in format, "format must contain at least one #" self.format = format self.signals = [Signal() for _ in range(limit)] self.names = [] if rule is None: rule = lambda num: str(num) for i in range(limit): self.names.append(format.replace("#", rule(i)))
[文档] def bind_signal( self, bundle, signal_name: str, signal, info_bundle_name, info_dut_name ): assert ( signal_name in self.names ), f"signal name {signal_name} not in signal list" index = self.names.index(signal_name) self.signals[index] = signal bundle._Bundle__update_signal_info(signal) info( f'dut\'s signal "{info_dut_name}" is connected to "{info_bundle_name}[{index}]"' )
[文档] def assign(self, value): assert len(value) == len( self.signals ), "value length must match signal list length" for i, signal in enumerate(self.signals): signal.value = value[i]
def __getitem__(self, key): return self.signals[key] def __str__(self): signal_str = ", ".join( f"{idx}: {signal}" for idx, signal in enumerate(self.signals) ) return f"SignalList({signal_str})"
[文档] @classmethod def from_signallist(cls, old_signal_list: "SignalList"): signal_list = cls("#", 1) signal_list.format = old_signal_list.format signal_list.signals = old_signal_list.signals[:] signal_list.names = old_signal_list.names[:] return signal_list
[文档] class BundleList: def __init__(self, bundle, format: str, limit: int, rule=None): assert limit > 0, "limit must be greater than 0" assert "#" in format, "format must contain at least one #" self.format = format self.names = [] self.bundles = [] if rule is None: rule = lambda num: str(num) for i in range(limit): name = format.replace("#", rule(i)) self.names.append(name) self.bundles.append(bundle.from_prefix(name))
[文档] def assign(self, value, multilevel=True): for i, bundle in enumerate(self.bundles): bundle.assign(value[i], multilevel)
def __len__(self): return len(self.bundles) def __getitem__(self, key): return self.bundles[key] def __str__(self): bundle_str = ", ".join( f"{idx}: {bundle}" for idx, bundle in enumerate(self.bundles) ) return f"BundleList({bundle_str})"
[文档] @classmethod def from_bundlelist(cls, old_bundle_list: "BundleList"): bundle_list = cls(old_bundle_list.bundles[0], "#", 1) bundle_list.format = old_bundle_list.format bundle_list.names = old_bundle_list.names[:] bundle_list.bundles = old_bundle_list.bundles[:] return bundle_list
class BindMethod(MObject): """ A bind method is a way to connect signals to a bundle. """ def __init__(self, method, value): self.method = method self.method_value = value def bind(self, bundle, all_signals, level_string, detection_mode): """ Bind the signals to the bundle. Args: bundle: The bundle to bind the signals to. all_signals: A list of signals to bind. level_string: The string of the current level. detection_mode: Whether the method is in detection mode. if it is, the bundle will not be connected to the signals. Returns: A tuple of three lists: - A list of connected signals. - A list of matching signals that are not connected. The item's name in this list is changed. - A list of signals that are not matched. """ raise NotImplementedError @staticmethod def get_matching_signal_list(bundle, signal_name: str) -> SignalList: """ Get the matching signal list in the bundle. Args: bundle: The bundle to get the matching signal list from. signal_name: The name of the signal to get. Returns: (signal list name, signal list) if the signal is found in the bundle, None otherwise. """ for signal_list_name, signal_list in bundle._Bundle__all_signal_lists(): if signal_name in signal_list.names: return (signal_list_name, signal_list) return None class PrefixBindMethod(BindMethod): """ A bind method that connects signals to a bundle by matching the prefix of the signal name. """ def __init__(self, prefix): super().__init__("prefix", prefix) def bind(self, bundle, all_signals, level_string, detection_mode): connected_signals = [] # Matched and connected signals matching_signals = [] # Matched but not connected signals, # item's name in the list is the name without prefix remain_signals = [] # Not matched signals prefix = self.method_value for signal in all_signals: if signal["name"].startswith(prefix): name_no_prefix = signal["name"][len(prefix) :] if name_no_prefix in bundle.current_level_signals: if not detection_mode: bundle._Bundle__add_signal_attr( name_no_prefix, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, name_no_prefix ), ) connected_signals.append(name_no_prefix) elif signal_list_name_and_inst := self.get_matching_signal_list( bundle, name_no_prefix ): if not detection_mode: signal_list_name_and_inst[1].bind_signal( bundle, name_no_prefix, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, signal_list_name_and_inst[0] ), ) connected_signals.append(name_no_prefix) else: matching_signals.append( { "name": signal["name"][len(prefix) :], "org_name": signal["org_name"], "signal": signal["signal"], } ) else: remain_signals.append(signal) return (connected_signals, matching_signals, remain_signals) class RegexBindMethod(BindMethod): """ A bind method that connects signals to a bundle by matching the regex of the signal name. """ def __init__(self, regex): super().__init__("regex", regex) def bind(self, bundle, all_signals, level_string, detection_mode): connected_signals = [] # Matched and connected signals matching_signals = [] # Matched but not connected signals, # item's name in the list is the name in the captured group remain_signals = [] # Not matched signals regex = self.method_value for signal in all_signals: match = re.search(regex, signal["name"]) if match is not None: groups = ["" if x is None else x for x in match.groups()] name = "".join(groups) if name in bundle.current_level_signals: if not detection_mode: bundle._Bundle__add_signal_attr( name, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, name ), ) connected_signals.append(name) elif signal_list_name_and_inst := self.get_matching_signal_list( bundle, name ): if not detection_mode: signal_list_name_and_inst[1].bind_signal( bundle, name, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, signal_list_name_and_inst[0] ), ) connected_signals.append(name) else: matching_signals.append( { "name": name, "org_name": signal["org_name"], "signal": signal["signal"], } ) else: remain_signals.append(signal) return (connected_signals, matching_signals, remain_signals) class DictBindMethod(BindMethod): """ A bind method that connects signals to a bundle by matching the dictionary. """ def __init__(self, dict): super().__init__("dict", dict) def bind(self, bundle, all_signals, level_string, detection_mode): connected_signals = [] # Matched and connected signals matching_signals = [] # Matched but not connected signals, # item's name in the list is the name without prefix remain_signals = [] # Not matched signals dict = self.method_value for signal in all_signals: if signal["name"] in dict.values(): name = list(dict.keys())[list(dict.values()).index(signal["name"])] if name in bundle.current_level_signals: if not detection_mode: bundle._Bundle__add_signal_attr( name, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, name ), ) connected_signals.append(name) elif signal_list_name_and_inst := self.get_matching_signal_list( bundle, name ): if not detection_mode: signal_list_name_and_inst[1].bind_signal( bundle, name, signal["signal"], info_dut_name=signal["org_name"], info_bundle_name=Bundle._Bundle__appended_level_string( level_string, signal_list_name_and_inst[0] ), ) connected_signals.append(name) else: matching_signals.append( { "name": name, "org_name": signal["org_name"], "signal": signal["signal"], } ) else: remain_signals.append(signal) return (connected_signals, matching_signals, remain_signals)
[文档] class Bundle(MObject): """ A bundle is a collection of signals in a DUT. """ signals = [] def __init__(self): """ Create a bundle. Instances created using this method are matched directly in bind. The create instance method provided by from_dict, from_prefix and from_regex enable easier connections. """ self.name = "" # The name of the bundle self.bound = False # Whether the bundle is bound to a DUT self.write_mode = None # The write mode of the bundle self.current_level_signals = [] # The signal names in the current level self.__clock_event = None self.__connect_method = PrefixBindMethod("") self.__dut_requests__ = Queue() self.__dut_instance__ = None self.__blocked_request__ = None self.__set_current_level_signal() # Recreate subbundles and signal lists for each instance for sub_bundle_name, sub_bundle in self.__all_sub_bundles(): new_sub_bundle = sub_bundle.__class__() new_sub_bundle.current_level_signals = sub_bundle.current_level_signals new_sub_bundle.__connect_method = sub_bundle.__connect_method setattr(self, sub_bundle_name, new_sub_bundle) # Recreate signal lists for each instance for signal_list_name, signal_list in self.__all_signal_lists(): new_signal_list = SignalList.from_signallist(signal_list) setattr(self, signal_list_name, new_signal_list) # Recreate bundle lists for each instance for bundle_list_name, bundle_list in self.__all_bundle_lists(): new_bundle_list = BundleList.from_bundlelist(bundle_list) setattr(self, bundle_list_name, new_bundle_list) def ___dut_call_on_rise__(self, cycle): """ Call the on rise method of target DUT. Args: cycle: The cycle of the rise. """ request = None if self.__blocked_request__ is not None: if self.__blocked_request__["__condition_func__"]( cycle, self, self.__blocked_request__.get("__condition_args__", None) ): request = self.__blocked_request__ request.pop("__condition_func__", None) request.pop("__condition_args__", None) self.__blocked_request__ = None else: return if request is None: if self.__dut_requests__.empty(): return request = self.__dut_requests__.get() if request is None: return if callable(request): data = request(cycle, self) if data is None: return request = data # check condition if "__condition_func__" in request: if not request["__condition_func__"]( cycle, self, request.get("__condition_args__", None) ): self.__blocked_request__ = request return request.pop("__condition_func__", None) request.pop("__condition_args__", None) # callbacks funcs = request.pop("__funcs__", None) return_bundles = request.pop("__return_bundles__", None) funcreturns = [] if not isinstance(funcs, list): funcs = [funcs] self.assign(request) for func in funcs: if callable(func): funcreturns.append(func(cycle, self)) else: funcreturns.append(None) if len(funcreturns) == 1: funcreturns = funcreturns[0] # return bundle if return_bundles is None: return if not isinstance(return_bundles, list): return_bundles = [return_bundles] ret_data = [] for return_bundle in return_bundles: if isinstance(return_bundle, Bundle): ret_data.append(return_bundle.as_dict()) if len(ret_data) == 1: ret_data = ret_data[0] # save return data in request request["__funcs_return__"] = funcreturns request["__return_values__"] = ret_data request["__return_cycles__"] = cycle
[文档] def make_requset_response_for(self, dut): """ Make a request response for the dut. Args: dut: The dut to make the request response for. """ if self.__dut_instance__ is not None: error("The dut instance is already set") dut.StepRis(self.___dut_call_on_rise__) self.__dut_instance__ = dut
[文档] def process_requests(self, request: Optional[Union[Dict, List[Dict]]]): """ Process the requests. Args: request: The request to process. """ for key in [ "__condition_func__", "__condition_args__", "__funcs__", "__return_bundles__", ]: if hasattr(self, key): error(f"bundule can not with name: {key}") assert self.__dut_requests__.empty(), "The request queue is not empty" if self.__dut_instance__ is None: error( "The dut instance is not set, need to call make_requset_response_for first" ) if not isinstance(request, list): request = [request] for req in request: self.__dut_requests__.put(req) while not self.__dut_requests__.empty(): self.__dut_instance__.Step(1) ret = [] for req in request: if isinstance(req, dict) and "__return_values__" in req: ret.append( { "data": req["__return_values__"], "cycle": req["__return_cycles__"], } ) ret[-1]["__funcs_return__"] = req["__funcs_return__"] return ret
[文档] def set_name(self, name): """ Set the name of the bundle. Args: name: The name of the bundle. Returns: The bundle itself. """ self.name = name return self
[文档] def set_prefix(self, prefix=""): """ Set the bundle to bind from prefix. Args: prefix: The prefix to match the signals. Returns: The bundle itself. """ self.__connect_method = PrefixBindMethod(prefix) return self
[文档] def set_regex(self, regex=r""): """ Set the bundle to bind from regex. Args: regex: The regex to match the signals. Returns: The bundle itself. """ self.__connect_method = RegexBindMethod(regex) return self
[文档] def set_dict(self, dict={}): """ Set the bundle to bind from a dictionary. Args: dict: The dictionary to guide bundle connections Returns: The bundle itself. """ self.__connect_method = DictBindMethod(dict) self.__check_dict_value(dict) return self
[文档] def set_write_mode(self, write_mode: WriteMode): """ Set the write mode of the bundle. it will change the write mode of all signals in the bundle. Args: write_mode: The write mode to set. Returns: The bundle itself. """ self.write_mode = write_mode self.__set_all_signals_write_mode(write_mode) return self
def __set_all_signals_write_mode(self, write_mode: WriteMode): """ Set the write mode of all signals in the bundle. Args: write_mode: The write mode to set. """ for _, signal in self.all_signals(): if Bundle.__is_instance_of_xpin(signal) and not signal.IsOutIO(): if write_mode == WriteMode.Imme: signal.AsImmWrite() elif write_mode == WriteMode.Rise: signal.AsRiseWrite() elif write_mode == WriteMode.Fall: signal.AsFallWrite() else: raise ValueError("write mode must be Imme, Rise, or Fall")
[文档] def set_write_mode_as_imme(self): """ Set the write mode of the bundle to immediate. Returns: The bundle itself. """ return self.set_write_mode(WriteMode.Imme)
[文档] def set_write_mode_as_rise(self): """ Set the write mode of the bundle to rise. Returns: The bundle itself. """ return self.set_write_mode(WriteMode.Rise)
[文档] def set_write_mode_as_fall(self): """ Set the write mode of the bundle to fall. Returns: The bundle itself. """ return self.set_write_mode(WriteMode.Fall)
[文档] async def step(self, ncycles=1): """ Wait for the clock for ncycles. Only works if the bundle has a connected signal. Args: ncycles: The number of cycles to wait. """ if self.__clock_event is None: critical("cannot use step in bundle without a connected signal") for _ in range(ncycles): await self.__clock_event.wait()
[文档] def bind(self, dut, unconnected_signal_access=True): """ Bind the dut's signal to this bundle. it will overwrites the previous bind. Args: dut: The dut to bind the bundle to. unconnected_signal_access: Whether unconnected signals could be accessed. Returns: The bundle itself. """ if self.bound: warning("bundle is already bound, the previous bind will be overwritten") self.__unbind_all() else: # When first bind, set all sub-bundles' name for sub_bundle_name, sub_bundle in self.__all_sub_bundles(): sub_bundle.set_name(sub_bundle_name) self.__bind_from_signal_list( list(self.dut_all_signals(dut)), self.name, [], unconnected_signal_access, False, None, None, ) self.bound = True if self.write_mode is not None: self.__set_all_signals_write_mode(self.write_mode) self.make_requset_response_for(dut) return self
[文档] def as_dict(self, multilevel=True): """ Collect all signals values into a dictionary. Args: multilevel: When multilevel is true, the subbundle signal values are put into a secondary dictionary. Otherwise, the dictionary returned will have only one level, with the sub-bundles separated by dots in keys. Returns: A dictionary of all signals values in the bundle. """ if multilevel: signals = { signal: getattr(self, signal).value for signal in self.current_level_signals } signal_lists = { signal_list_name: [signal.value for signal in signal_list] for signal_list_name, signal_list in self.__all_signal_lists() } sub_bundles = { sub_bundle_name: getattr(self, sub_bundle_name).as_dict(multilevel) for sub_bundle_name, _ in self.__all_sub_bundles() } sub_bundle_lists = { sub_bundle_list_name: [ sub_bundle.as_dict(multilevel) for sub_bundle in sub_bundle_list ] for sub_bundle_list_name, sub_bundle_list in self.__all_bundle_lists() } return {**signals, **signal_lists, **sub_bundles, **sub_bundle_lists} else: signals = { signal: getattr(self, signal).value for signal in self.current_level_signals } for signal_list_name, signal_list in self.__all_signal_lists(): signals.update( {signal_list_name: [signal.value for signal in signal_list]} ) for sub_bundle_name, sub_bundle in self.__all_sub_bundles(): sub_bundle_dict = sub_bundle.as_dict(multilevel) for sub_bundle_signal, value in sub_bundle_dict.items(): signals[f"{sub_bundle_name}.{sub_bundle_signal}"] = value for bundle_list_name, bundle_list in self.__all_bundle_lists(): for i, bundle in enumerate(bundle_list): bundle_dict = bundle.as_dict(multilevel) for bundle_signal, value in bundle_dict.items(): signals[f"{bundle_list_name}[{i}].{bundle_signal}"] = value return signals
[文档] def set_all(self, value): """ Set all signals values to a value, including sub-bundles. Args: value: The value to set. Returns: The bundle itself. """ for _, signal in self.all_signals(): if Bundle.__is_instance_of_xpin(signal) and not signal.IsOutIO(): signal.value = value return self
[文档] def randomize_all( self, value_range=None, exclude_signals=[], random_func=random.randint ): """ Randomize all signals values in the bundle. Args: value_range: The range of the random values, eg. (0, 100), both values are inclusive. If None, the range of random values will be the range of each signal value. exclude_signals: A list of signals to exclude from randomization, sub-bundle names are separated by dots. random_func: The random function to use, default is random.randint. """ for signal_name, signal in self.all_signals(): if ( signal_name not in exclude_signals and Bundle.__is_instance_of_xpin(signal) and not signal.IsOutIO() ): if value_range is not None: signal.value = random_func(value_range[0], value_range[1]) else: signal_width = signal.W() if signal_width == 0: signal_width = 1 signal.value = random_func(0, 2**signal_width - 1)
[文档] def assign(self, item, multilevel=True, level_string=""): """ Assign all signals values. Args: item: item can be a dict or an object with a __bundle_assign__ method defined. When item is a dictionary, if "*" is in the dictionary, the value of "*" will be assigned to all signals that are not in the dictionary. Otherwise assign will call the __bundle_assign__ function in item to complete the assignment to the bundle multilevel: When multilevel is true, the subbundle signal values are taken from a secondary dictionary. Otherwise, the dictionary should have only one level, with the sub-bundles separated by dots in keys. """ # Case 1: Item is an object with __bundle_assign__ method if not isinstance(item, dict): if hasattr(item, "__bundle_assign__"): item.__bundle_assign__(self) else: critical( "assign: item must be a dictionary or an object with __bundle_assign__ method" ) return # Case 2: Item is a dictionary if "*" in item: self.set_all(item["*"]) del item["*"] if multilevel: for signal, value in item.items(): if signal in self.current_level_signals: getattr(self, signal).value = value elif any( signal_list[0] == signal for signal_list in self.__all_signal_lists() ): getattr(self, signal).assign(value) elif any( subbundle[0] == signal for subbundle in self.__all_sub_bundles() ): getattr(self, signal).assign( value, multilevel, Bundle.__appended_level_string(level_string, signal), ) elif any( bundle_list[0] == signal for bundle_list in self.__all_bundle_lists() ): getattr(self, signal).assign(value, multilevel) else: full_signal_name = Bundle.__appended_level_string( level_string, signal ) error(f'assign: signal "{full_signal_name}" is not found in bundle') else: for signal, value in item.items(): if signal in self.current_level_signals: getattr(self, signal).value = value elif any( signal_list[0] == signal for signal_list in self.__all_signal_lists() ): getattr(self, signal).assign(value) else: sub_bundle_name = None if "." in signal: sub_bundle_name, sub_bundle_signal = signal.split(".", 1) if sub_bundle_name in [ sub_bundle[0] for sub_bundle in self.__all_sub_bundles() ]: getattr(self, sub_bundle_name).assign( {sub_bundle_signal: value}, multilevel, Bundle.__appended_level_string( level_string, sub_bundle_name ), ) elif any( bundle_list[0] == signal for bundle_list in self.__all_bundle_lists() ): getattr(self, signal).assign(value, multilevel) else: full_signal_name = Bundle.__appended_level_string( level_string, signal ) error( f'assign: signal "{full_signal_name}" is not found in bundle' )
[文档] def detect_connectivity(self, signal_name): """ Detect wether a signal name could be connected to this bundle. Args: bundle: The bundle to connect. signal_name: The name of the signal to connect. Returns: True if the signal name could be connected to the bundle, False otherwise. """ all_signals = [{"name": signal_name, "org_name": signal_name, "signal": None}] all_signals = self.__bind_from_signal_list( all_signals, self.name, [], False, True, None, None ) return len(all_signals) == 0
[文档] def all_signals_rule(self): """ Get all signals rules in the bundle. Returns: A dictionary of all signals rules in the bundle. """ all_signals_rule = {} self.__bind_from_signal_list( [], self.name, [], False, True, None, all_signals_rule ) return all_signals_rule
[文档] def detect_specific_connectivity(self, signal_name, specific_signal): """ Detects whether a signal name can be connected to a specific signal in the bundle Args: signal_name: The name of the signal to connect. specific_signal: The specific signal in this bundle to connect to, format such as "subbundle1.subbundle2.siganlA" """ all_signals = [{"name": signal_name, "org_name": signal_name, "signal": None}] all_signals = self.__bind_from_signal_list( all_signals, "", [], False, True, specific_signal, None ) return len(all_signals) == 0
[文档] @classmethod def from_prefix(cls, prefix="", dut=None): """ Create a bundle from a prefix. Args: prefix: The prefix to match the signals. Returns: A new bundle. """ new_bundle = cls() if dut is not None: for attr_key in dir(dut): if not attr_key.startswith(prefix): continue attr_value = getattr(dut, attr_key) if "XData" not in attr_value.__class__.__name__: continue new_bundle.current_level_signals.append(attr_key[len(prefix) :]) # new_bundle.__set_current_level_signal() new_bundle.__connect_method = PrefixBindMethod(prefix) return new_bundle
[文档] @classmethod def from_regex(cls, regex=r""): """ Create a bundle from a regex. The signals captured in the regex will be used for matching. If there are multiple capture groups, they are concatenated into a string to match. Args: regex: The regex to match the signals. Returns: A new bundle. """ new_bundle = cls() new_bundle.__connect_method = RegexBindMethod(regex) return new_bundle
[文档] @classmethod def from_dict(cls, dict={}): """ Create a bundle from a dictionary. The keys of the dictionary are the names of the signals in the bundle, and the values are the names of the signals in DUT. Args: dict: The dictionary to guide bundle connections Returns: A new bundle. """ new_bundle = cls() new_bundle.__connect_method = DictBindMethod(dict) new_bundle.__check_dict_value(dict) return new_bundle
[文档] @staticmethod def new_class_from_list(signal_list): """ Create a new bundle class with a list of signals quickly. >>> myBundle = Bundle.new_class_from_list(["a", "b", "c"]).from_prefix("io_") Args: signal_list: A list of signals. Returns: A new bundle class. """ class NewBundle(Bundle): signals = signal_list return NewBundle
[文档] @staticmethod def new_class_from_xport(xport): """ Create a new bundle class with an XPort quickly. Args: xport: The XPort to create the bundle from. Returns: A new bundle class. """ return Bundle.new_class_from_list(xport.GetKeys())
[文档] @classmethod def from_xport(cls, xport): """ Create a bundle from an XPort. Args: xport: The XPort to create the bundle from. Returns: A new bundle. """ signal_list = xport.GetKeys() bundle = cls.new_class_from_list(signal_list)() for signal in signal_list: setattr(bundle, signal, xport[signal]) return bundle
def __set_current_level_signal(self): """ Collect all signals in the current level and save them in the current_level_signals. """ self.current_level_signals = [signal for signal in self.signals] for signal in dir(self): if isinstance(getattr(self, signal), Signal): self.current_level_signals.append(signal)
[文档] def all_signals(self, level_string=""): """ Yield all signals of the bundle. Returns: A generator of signal names and signals. """ for signal in self.current_level_signals: yield (Bundle.__appended_level_string(level_string, signal)), getattr( self, signal, None ) for signal_list_name, signal_list in self.__all_signal_lists(): for idx, signal in enumerate(signal_list.signals): yield ( Bundle.__appended_level_string( level_string, f"{signal_list_name}[{idx}]" ), signal, ) for sub_bundle_name, sub_bundle in self.__all_sub_bundles(): yield from sub_bundle.all_signals( Bundle.__appended_level_string(level_string, sub_bundle_name) ) for bundle_list_name, bundle_list in self.__all_bundle_lists(): for idx, bundle in enumerate(bundle_list.bundles): yield from bundle.all_signals( Bundle.__appended_level_string( level_string, f"{bundle_list_name}[{idx}]" ) )
def __getitem__(self, key): """ Get the signal by key. Args: key: The key of the signal. Returns: The signal. """ return getattr(self, key, None) def __add_signal_attr(self, signal_name, signal, info_bundle_name, info_dut_name): """ Add a signal attribute to the bundle and log the connection. Args: signal_name: The name of the signal in the bundle. signal: The signal itself. info_bundle_name: The name of the bundle in the log. info_dut_name: The name of the signal in the DUT in the log. """ setattr(self, signal_name, signal) self.__update_signal_info(signal) info(f'dut\'s signal "{info_dut_name}" is connected to "{info_bundle_name}"') def __update_signal_info(self, signal): """ Update the signal info when a signal is connected to the bundle. """ if self.__clock_event is None: self.__clock_event = signal.event if hasattr(signal.xdata, "number_of_bundles_connected_to"): signal.xdata.number_of_bundles_connected_to += 1 else: signal.xdata.number_of_bundles_connected_to = 1 def __str__(self): item_str = "" signals = ", ".join( [ f"{signal}: {getattr(self, signal)}" for signal in self.current_level_signals ] ) signal_lists = ", ".join( [ f"{signal_list_name}: {signal_list}" for signal_list_name, signal_list in self.__all_signal_lists() ] ) sub_bundles = ", ".join( [ f"{sub_bundle_name}: {sub_bundle}" for sub_bundle_name, sub_bundle in self.__all_sub_bundles() ] ) bundle_lists = ", ".join( [ f"{bundle_list_name}: {bundle_list}" for bundle_list_name, bundle_list in self.__all_bundle_lists() ] ) item_str = signals if signal_lists != "": item_str = item_str + ", " + signal_lists if sub_bundles != "": item_str = item_str + ", " + sub_bundles if bundle_lists != "": item_str = item_str + ", " + bundle_lists return f"{type(self).__name__}({item_str})" def __all_sub_bundles(self): """ Yield all sub-bundles of the bundle. Returns: A generator of tuples (attr_name, sub_bundle) where attr is the name of the sub-bundle and sub_bundle is the sub-bundle itself. """ for attr in dir(self): if isinstance(getattr(self, attr), Bundle): yield (attr, getattr(self, attr)) def __all_signal_lists(self): """ Yield all signal lists of the bundle. Returns: A generator of tuples (attr_name, signal_list) where attr is the name of the signal list and signal_list is the signal list itself. """ for attr in dir(self): if isinstance(getattr(self, attr), SignalList): yield (attr, getattr(self, attr)) def __all_bundle_lists(self): """ Yield all bundle lists of the bundle. Returns: A generator of tuples (attr_name, bundle_list) where attr is the name of the bundle list and bundle_list is the bundle list itself. """ for attr in dir(self): if isinstance(getattr(self, attr), BundleList): yield (attr, getattr(self, attr)) def __detect_missing_signals( self, connected_signals, level_string, rule_stack, unconnected_signal_access ): """ Detect missing signals in the bundle. Log a warning if a signal is not found. When unconnected_signal_access is True, set the unconnected signals as dummy signals. Args: connected_signals: A list of connected signals. level_string: The string of the current level. unconnected_signal_access: Whether unconnected signals could be accessed. """ self._dummy_signal = DummySignal() error_connected_signals = set() for signal in self.current_level_signals: if signal not in connected_signals: rule_string = Bundle.__get_rule_string(rule_stack, signal) error_connected_signals.add(( str(Bundle.__appended_level_string(level_string, signal)), rule_string )) if unconnected_signal_access: setattr(self, signal, self._dummy_signal) for signal_list_name, signal_list in self.__all_signal_lists(): for idx, signal in enumerate(signal_list.names): if signal not in connected_signals: rule_string = Bundle.__get_rule_string(rule_stack, signal) level_string = Bundle.__appended_level_string( level_string, f"{signal_list_name}[{idx}]" ) error_connected_signals.add(( level_string, rule_string )) if unconnected_signal_access: signal_list.signals[idx] = self._dummy_signal if len(error_connected_signals) > 0: error_message = f"Signal bind error, the following signals in bundle {self.__class__.__name__} are not found in dut, make sure the connection rules are correct (signal_name -> search_rule_in_dut): " error_message += ",".join([f"{a} -> {b}" for a, b in error_connected_signals]) raise Exception(error_message) def __remove_signal_attr(self, signal_name): """ Remove a signal attribute from the bundle. Args: signal_name: The name of the signal to remove. """ signal = getattr(self, signal_name) if hasattr(signal.xdata, "number_of_bundles_connected_to"): signal.xdata.number_of_bundles_connected_to -= 1 if signal.xdata.number_of_bundles_connected_to == 0: delattr(signal.xdata, "number_of_bundles_connected_to") delattr(self, signal_name) def __unbind_all(self): """ Unbind all signals to the bundle. """ for signal_name in self.current_level_signals: if hasattr(self, signal_name): self.__remove_signal_attr(signal_name) for _, sub_bundle in self.__all_sub_bundles(): sub_bundle.__unbind_all() def __bind_from_signal_list( self, all_signals, level_string, rule_stack, unconnected_signal_access, detection_mode, specific_signal, all_signals_rule, ): """ Bind the signals to the bundle. Args: all_signals: A list of signals to bind. level_string: The string of the current level. rule_stack: The stack of rules to bind the signals, this is a list of bundles. unconnected_signal_access: Whether unconnected signals could be accessed. detection_mode: Whether the method is in detection mode. if it is, the bundle will not be connected to the signals. specific_signal: It is only valid in detection mode. If it is not None, only the specific signal will be connected. all_signals_rule: It is only valid in detection mode. If it is not None, the rule of all signals will be stored in this dictionary. Returns: A list of signals that are not matched. """ rule_stack = rule_stack + [self] connected_signals, matching_signals, remain_signals = ( self.__connect_method.bind(self, all_signals, level_string, detection_mode) ) if not detection_mode: self.__detect_missing_signals( connected_signals, level_string, rule_stack, unconnected_signal_access ) if specific_signal is not None: error("specific signal could only be used in detection mode") else: if specific_signal is not None: remain_signals += connected_signals connected_signals = [] for signal in remain_signals: full_signal_name = Bundle.__appended_level_string( level_string, signal["name"] ) if full_signal_name == specific_signal: connected_signals.append(signal) remain_signals.remove(signal) if all_signals_rule is not None: for signal in self.current_level_signals: full_signal_name = Bundle.__appended_level_string( level_string, signal ) rule_string = Bundle.__get_rule_string(rule_stack, signal) all_signals_rule[full_signal_name] = rule_string for signal_list_name, signal_list in self.__all_signal_lists(): for idx, signal in enumerate(signal_list.names): full_signal_name = Bundle.__appended_level_string( level_string, f"{signal_list_name}[{idx}]" ) rule_string = Bundle.__get_rule_string(rule_stack, signal) all_signals_rule[full_signal_name] = rule_string # Bind the remain signals to the sub-bundles for sub_bundle_name, sub_bundle in self.__all_sub_bundles(): matching_signals = sub_bundle.__bind_from_signal_list( matching_signals, Bundle.__appended_level_string(level_string, sub_bundle_name), rule_stack, unconnected_signal_access, detection_mode, specific_signal, all_signals_rule, ) if sub_bundle.__clock_event is not None: self.__clock_event = sub_bundle.__clock_event for bundle_list_name, bundle_list in self.__all_bundle_lists(): for idx, bundle in enumerate(bundle_list.bundles): matching_signals = bundle.__bind_from_signal_list( matching_signals, Bundle.__appended_level_string( level_string, f"{bundle_list_name}[{idx}]" ), rule_stack, unconnected_signal_access, detection_mode, specific_signal, all_signals_rule, ) Bundle.__revert_signal_name(matching_signals, all_signals) return matching_signals + remain_signals def __check_dict_value(self, dict): """ Check if the values of the dictionary are valid. Args: dict: The dictionary to check. """ signal_list = [ {"name": value, "org_name": key, "signal": None} for key, value in dict.items() ] unconnected_signals = self.__bind_from_signal_list( signal_list, self.name, [], False, True, None, None ) unconnected_signal_names = [ signal["org_name"] for signal in unconnected_signals ] if len(unconnected_signal_names) > 0: warning( f"The signal names {unconnected_signal_names} in {type(self).__name__}'s connection dictionary " "are invalid, because they cannot match any signals from the Bundle" )
[文档] @staticmethod def detect_unconnected_signals(dut): """ Detect signals that are not connected to any bundle. Args: dut: The dut to detect. Returns: A list of unconnected signals """ unconnected_signals = [] for signal_info in Bundle.dut_all_signals(dut): signal = signal_info["signal"] if ( not hasattr(signal, "number_of_bundles_connected_to") or signal.number_of_bundles_connected_to == 0 ): unconnected_signals.append(signal_info["org_name"]) return unconnected_signals
[文档] @staticmethod def detect_multiple_connections(dut): """ Detect signals that are connected to multiple bundles. Args: dut: The dut to detect. Returns: A list of signals that are connected to multiple bundles. """ multiple_connections = [] for signal_info in Bundle.dut_all_signals(dut): signal = signal_info["signal"] if ( hasattr(signal, "number_of_bundles_connected_to") and signal.number_of_bundles_connected_to > 1 ): multiple_connections.append(signal_info["org_name"]) return multiple_connections
@staticmethod def __appended_level_string(level_string, level): """ Append a string to the current level string. Args: level_string: The current level string. level: The string to append. Returns: The appended string. """ if level_string == "": return level else: return f"{level_string}.{level}" @staticmethod def __revert_signal_name(signal_list, last_signal_list): """ Change the item's name in signal_list to the item's name in last_signal_list, if it has the same original name. Args: signal_list: A list of signals to revert. this is a list of dictionaries containing the name, original name, and signal pin of the signal. last_signal_list: The structure of the last signal list is the same as the signal_list. """ for signal in signal_list: for last_signal in last_signal_list: if signal["org_name"] == last_signal["org_name"]: signal["name"] = last_signal["name"] @staticmethod def __is_instance_of_xpin(signal): """ Check if the signal is an instance of XPin. XPin is a class from dut generated by picker that represents a signal in a simulation. Args: signal: The signal to check. Returns: True if the signal is an instance of XPin, False otherwise. """ return ( signal is not None and not isinstance(signal, DummySignal) and hasattr(signal, "xdata") and hasattr(signal, "event") )
[文档] @staticmethod def dut_all_signals(dut): """ Yield all signals of the dut. Args: dut: The dut to get signals from. Returns: A generator of dictionaries containing the name, original name, and signal of the signal. """ for attr in dir(dut): signal = getattr(dut, attr) if not callable(signal) and Bundle.__is_instance_of_xpin(signal): yield {"name": attr, "org_name": attr, "signal": signal}
@staticmethod def __get_rule_string(rule_stack, signal): """ Get the rule string from the rule stack. Args: rule_stack: The stack of rules, this is a list of bundles. signal: The signal to get the rule string. Returns: The rule string. """ rule_string = "" rule_stack = rule_stack + [signal] for index, rule in enumerate(rule_stack): # Prefix if rule is rule_stack[-1] or rule.__connect_method.method == "prefix": rule_string += ( rule if rule is rule_stack[-1] else rule.__connect_method.method_value ) # Dict elif rule.__connect_method.method == "dict": relative_signal_name = Bundle.__get_path_from_rule_stack( rule_stack[index:] ) flitered_dict = rule_stack[index].__filter_signals_in_dict( rule.__connect_method.method_value, relative_signal_name ) dict_string = f"<No Matching Signal in Dict>" if len(flitered_dict) > 0: dict_string = "|".join( [f"{value}" for _, value in flitered_dict.items()] ) if len(flitered_dict) > 1: dict_string = f"({dict_string})" rule_string += dict_string break # Regex elif rule.__connect_method.method == "regex": rule_string += f"{rule.__connect_method.method_value} -> " else: raise ValueError(f"rule must be 'dict', 'prefix', or 'regex'") return rule_string def __filter_signals_in_dict(self, dict, could_connected_to): """ Filter signals in the dictionary that could be connected to a specific signal. Args: dict: The dictionary to filter. could_connected_to: The specific signal to connect to. Returns: A dictionary of signals that could be connected to the specific signal. """ filtered_dict = {} for key, value in dict.items(): if self.detect_specific_connectivity(key, could_connected_to): filtered_dict[key] = value return filtered_dict @staticmethod def __get_path_from_rule_stack(rule_stack, signal=""): """ Get siganl path from the rule stack. Args: rule_stack: The stack of rules, this is a list of bundles. signal: The signal to get the path. """ path = "" if signal != "": rule_stack = rule_stack + [signal] for rule in rule_stack[1:]: if rule is rule_stack[-1]: path += rule else: path += rule.name + "." return path