CPU/CPU.py
2024-04-29 16:01:52 +08:00

132 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import queue
from Memory import Memory
import pathlib
import program
import importlib
class Core:
def __init__(self, id, registers: int, dtMemory: int, cmdMemory: int):
self.id = id
# 创建存储器
self.registers = Memory(registers)
self.dtMemory = Memory(dtMemory)
self.cmdMemory = Memory(cmdMemory)
# 设置存储器
self.registers.name = f"寄存器 - Core{id}"
self.dtMemory.name = f"数据内存 - Core{id}"
self.cmdMemory.name = f"程序内存 - Core{id}"
self.pc = 0 # 程序计数器
self.stack = queue.LifoQueue(maxsize=10 * self.registers.size) # 栈
self.ASMModule = []
self.ASMs = {}
for file in pathlib.Path("ASM").iterdir():
if file.is_dir():
continue
module = "ASM." + file.name.split(".")[0]
module = importlib.import_module(module)
for cmd in module.CMDs:
self.ASMs[cmd] = module
def loadProgram(self, code):
# 加载程序至cmdMemroy
print("加载长度为", len(code), "的程序")
self.cmdMemory[: len(code)] = code
def fetchInstruction(self):
# 取指令
if self.pc < len(self.cmdMemory):
instruction = self.cmdMemory[self.pc]
self.pc += 1
return instruction
else:
return None # 返回None表示程序已经执行完毕
def init(self):
# 初始化内存和寄存器
self.pc = 0
self.dtMemory.init()
self.registers.init()
def executeInstruction(self, instruction):
# 执行指令
if instruction is None:
return False # 程序已执行完毕返回False
lists = instruction.split(" ")
# 解析指令
opcode = lists[0] # 操作码
operand = lists[1:] # 操作数
if opcode in self.ASMs:
ins = getattr(self.ASMs[opcode], opcode)
ins(self, operand)
if opcode == "prt":
# 调试函数,输出文字
print(operand[0])
elif opcode == "halt": # HALT
# 结束
return False
return True
def write_to_memory(self, address, data):
# 往内存写入数据
self.dtMemory[address] = data
def run(self):
# 开始执行
while True:
instruction = self.fetchInstruction()
if not self.executeInstruction(instruction):
break
class MultiCoreCPU:
def __init__(self, num_cores, registers, dt_memory, cmd_memory_size=-1):
self.cores = []
self.programs = queue.Queue(maxsize=num_cores)
if cmd_memory_size <= 0:
cmd_memory_size = dt_memory.size
for i in range(num_cores):
cmdMem = Memory(cmd_memory_size)
core = Core(i, registers, dt_memory, cmdMem)
self.cores.append(core)
def load_program(self, program):
self.programs.put_nowait(program)
def run(self):
oldCore = -1
while not self.programs.empty():
oldCore += 1
self.cores[oldCore].loadProgram(self.programs.get())
def halt(self):
for core in self.cores:
core.is_running = False
class CPU(Core):
def __init__(self, registers, dtMemory, cmdMemory):
super().__init__(0, registers, dtMemory, cmdMemory)
if __name__ == "__main__":
cpu = CPU(registers=64, dtMemory=1024, cmdMemory=1024)
program.Operand(["r0"], cpu)[0].set(2)
# f = open('Example/加法器.txt')
# code = f.read().split('\n')
# cpu.loadProgram(code)
cpu.write_to_memory(0, 1)
cpu.write_to_memory(1, 1)
# cpu.run()
print(cpu.registers[0])