CPU/CPU.py
2024-05-07 16:50:16 +08:00

132 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import queue
from Memory import Memory
from error import *
import pathlib
import warnings
import program
import importlib
class Core:
def __init__(self, id, registers: int, dtMemory: int, cmdMemory: int):
self.id = id
# 创建存储器
self.registers = Memory(registers)
self.dtMemory = Memory(dtMemory)
self.cmdMemory = Memory(cmdMemory)
# 设置存储器
self.registers.name = f"寄存器 - Core{id}"
self.dtMemory.name = f"数据内存 - Core{id}"
self.cmdMemory.name = f"程序内存 - Core{id}"
self.pc = 0 # 程序计数器
self.stack = queue.LifoQueue(maxsize=10 * self.registers.size) # 栈
self.ASMs = {}
for file in pathlib.Path("ASM").iterdir():
if file.is_dir():
continue
if str(file).split('.')[-1] not in ['py', 'pyc', 'pyd', 'so']:
continue
module = "ASM." + file.name.split(".")[0]
module = importlib.import_module(module)
for cmd in module.CMDs:
self.ASMs[cmd] = module
def loadProgram(self, code):
# 加载程序至cmdMemroy
self.cmdMemory.write_batch(0, code)
def fetchInstruction(self):
# 取指令
if self.pc < len(self.cmdMemory):
instruction = self.cmdMemory[self.pc]
self.pc += 1
return instruction
else:
return None # 返回None表示程序已经执行完毕
def init(self):
# 初始化内存和寄存器
self.pc = 0
self.dtMemory.init()
self.registers.init()
def executeInstruction(self, instruction):
# 执行指令
if instruction is None:
return False # 程序已执行完毕返回False
lists = instruction.split(" ")
# 解析指令
opcode = lists[0] # 操作码
operand = program.Operand(lists[1:], self) # 操作数
if opcode in self.ASMs:
ins = getattr(self.ASMs[opcode], opcode)
ret = ins(self, operand)
if ret is None:
return True
elif ret is False:
return False
else:
return True
else:
raise InvalidInstruction(instruction)
def run(self):
# 开始执行
while True:
instruction = self.fetchInstruction()
if not self.executeInstruction(instruction):
break
class MultiCoreCPU:
def __init__(self, num_cores, registers, dt_memory, cmd_memory_size=-1):
warnings.warn("该类型CPU有BUG")
self.cores = []
self.programs = queue.Queue(maxsize=num_cores)
if cmd_memory_size <= 0:
cmd_memory_size = dt_memory.size
for i in range(num_cores):
cmdMem = Memory(cmd_memory_size)
core = Core(i, registers, dt_memory, cmdMem)
self.cores.append(core)
def load_program(self, program):
self.programs.put_nowait(program)
def run(self):
oldCore = -1
while not self.programs.empty():
oldCore += 1
self.cores[oldCore].loadProgram(self.programs.get())
def halt(self):
for core in self.cores:
core.is_running = False
class CPU(Core):
def __init__(self, registers, dtMemory, cmdMemory):
super().__init__(0, registers, dtMemory, cmdMemory)
if __name__ == "__main__":
cpu = CPU(registers=64, dtMemory=1024, cmdMemory=10)
program.Operand(["r0"], cpu)[0].set(2)
f = open('Example/加法器.txt')
code = f.read().split('\n')
cpu.loadProgram(code)
cpu.dtMemory.write_batch(0, [0, 1])
cpu.run()
# print(cpu.registers[0])