From aed1e3f6f294f1ef9a916077d02243e962bd4581 Mon Sep 17 00:00:00 2001 From: root <1561515308@qq.com> Date: Mon, 29 Apr 2024 16:00:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9F=BA=E7=A1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ASM/Jump.py | 10 ++++ ASM/Logical.py | 11 ++++ ASM/Memory.py | 21 ++++++++ ASM/count.py | 20 +++++++ ASM/debug.py | 19 +++++++ CPU.py | 131 +++++++++++++++++++++++++++++++++++++++++++++ Device/Disk.py | 41 ++++++++++++++ Device/QTScreen.py | 71 ++++++++++++++++++++++++ Example/加法器.txt | 5 ++ Memory.py | 53 ++++++++++++++++++ error.py | 2 + program.py | 73 +++++++++++++++++++++++++ 12 files changed, 457 insertions(+) create mode 100644 ASM/Jump.py create mode 100644 ASM/Logical.py create mode 100644 ASM/Memory.py create mode 100644 ASM/count.py create mode 100644 ASM/debug.py create mode 100644 CPU.py create mode 100644 Device/Disk.py create mode 100644 Device/QTScreen.py create mode 100644 Example/加法器.txt create mode 100644 Memory.py create mode 100644 error.py create mode 100644 program.py diff --git a/ASM/Jump.py b/ASM/Jump.py new file mode 100644 index 0000000..0a6c242 --- /dev/null +++ b/ASM/Jump.py @@ -0,0 +1,10 @@ +CMDs = ["jmp", "jmp_if"] + + +def jmp(self, operand): + self.pc = int(operand[0]) + + +def jmp_if(self, operand): + if int(operand[1]) == 1: + self.pc = int(operand[0]) diff --git a/ASM/Logical.py b/ASM/Logical.py new file mode 100644 index 0000000..e6e0672 --- /dev/null +++ b/ASM/Logical.py @@ -0,0 +1,11 @@ +CMDs = ["eq"] # , 'en', 'gt', 'lt', 'ge', 'le'] + + +def eq(self, operand): + # 是否等于 + a = self.registers[int(operand[0])] + b = self.registers[int(operand[1])] + if a == b: + self.registers[int(operand[2])] = 1 + else: + self.registers[int(operand[2])] = 0 diff --git a/ASM/Memory.py b/ASM/Memory.py new file mode 100644 index 0000000..70d7e77 --- /dev/null +++ b/ASM/Memory.py @@ -0,0 +1,21 @@ +# pylint:disable=W0622 +CMDs = ["ldr", "str"] + + +def ldr(self, operand): + print("从内存的", operand[1], "处加载至寄存器", operand[0]) + if int(operand[1]) < len(self.dtMemory): + value = self.dtMemory[int(operand[1])] + self.registers[int(operand[0])] = value + else: + print("ERROR: Attempted to load from invalid memory address.") + + +def str(self, operand): + # 往内存写入数据 + if int(operand[1]) < len(self.dtMemory): + value = self.registers[int(operand[0])] + print(value) + self.dtMemory[int(operand[1])] = value + else: + print("ERROR: Attempted to store to invalid memory address.") diff --git a/ASM/count.py b/ASM/count.py new file mode 100644 index 0000000..2d13d9f --- /dev/null +++ b/ASM/count.py @@ -0,0 +1,20 @@ +CMDs = ["add", "sub", "mul", "div"] + + +def add(self, operand): + self.registers[int(operand[0])] += self.registers[int(operand[1])] + + +def sub(self, operand): + self.registers[int(operand[0])] -= self.registers[int(operand[1])] + + +def mul(self, operand): + self.registers[int(operand[0])] *= self.registers[int(operand[1])] + + +def div(self, operand): + if self.registers[int(operand[1])] != 0: + self.registers[int(operand[0])] //= self.registers[int(operand[1])] + else: + raise ZeroDivisionError() diff --git a/ASM/debug.py b/ASM/debug.py new file mode 100644 index 0000000..6b302f4 --- /dev/null +++ b/ASM/debug.py @@ -0,0 +1,19 @@ +CMDs = ["prt", "prtr", "prtm"] + + +def prt(self, operand): + print(operand[0]) + + +def prtr(self, operand): + if len(operand) > 0: + print(self.registers[int(operand[0])], end="") + else: + print(self.registers, end="") + + +def prtm(self, operand): + if len(operand) > 0: + print(self.dtMemory[int(operand[0])], end="") + else: + print(self.dtMemory, end="") diff --git a/CPU.py b/CPU.py new file mode 100644 index 0000000..4ba5cd8 --- /dev/null +++ b/CPU.py @@ -0,0 +1,131 @@ +import queue +from Memory import Memory +import pathlib +import program +import importlib + + +class Core: + def __init__(self, id, registers: int, dtMemory: int, cmdMemory: int): + self.id = id + # 创建存储器 + self.registers = Memory(registers) + self.dtMemory = Memory(dtMemory) + self.cmdMemory = Memory(cmdMemory) + + # 设置存储器 + self.registers.name = f"寄存器 - Core{id}" + self.dtMemory.name = f"数据内存 - Core{id}" + self.cmdMemory.name = f"程序内存 - Core{id}" + + self.pc = 0 # 程序计数器 + self.stack = queue.LifoQueue(maxsize=10 * self.registers.size) # 栈 + + self.ASMModule = [] + self.ASMs = {} + + for file in pathlib.Path("ASM").iterdir(): + if file.is_dir(): + continue + module = "ASM." + file.name.split(".")[0] + module = importlib.import_module(module) + for cmd in module.CMDs: + self.ASMs[cmd] = module + + def loadProgram(self, code): + # 加载程序至cmdMemroy + print("加载长度为", len(code), "的程序") + self.cmdMemory[: len(code)] = code + + def fetchInstruction(self): + # 取指令 + if self.pc < len(self.cmdMemory): + instruction = self.cmdMemory[self.pc] + self.pc += 1 + return instruction + else: + return None # 返回None表示程序已经执行完毕 + + def init(self): + # 初始化内存和寄存器 + self.pc = 0 + self.dtMemory.init() + self.registers.init() + + def executeInstruction(self, instruction): + # 执行指令 + if instruction is None: + return False # 程序已执行完毕,返回False + + lists = instruction.split(" ") + + # 解析指令 + opcode = lists[0] # 操作码 + operand = lists[1:] # 操作数 + + if opcode in self.ASMs: + ins = getattr(self.ASMs[opcode], opcode) + ins(self, operand) + + if opcode == "prt": + # 调试函数,输出文字 + print(operand[0]) + + elif opcode == "halt": # HALT + # 结束 + return False + return True + + def write_to_memory(self, address, data): + # 往内存写入数据 + self.dtMemory[address] = data + + def run(self): + # 开始执行 + while True: + instruction = self.fetchInstruction() + if not self.executeInstruction(instruction): + break + + +class MultiCoreCPU: + def __init__(self, num_cores, registers, dt_memory, cmd_memory_size=-1): + self.cores = [] + + self.programs = queue.Queue(maxsize=num_cores) + if cmd_memory_size <= 0: + cmd_memory_size = dt_memory.size + for i in range(num_cores): + cmdMem = Memory(cmd_memory_size) + core = Core(i, registers, dt_memory, cmdMem) + self.cores.append(core) + + def load_program(self, program): + self.programs.put_nowait(program) + + def run(self): + oldCore = -1 + while not self.programs.empty(): + oldCore += 1 + self.cores[oldCore].loadProgram(self.programs.get()) + + def halt(self): + for core in self.cores: + core.is_running = False + + +class CPU(Core): + def __init__(self, registers, dtMemory, cmdMemory): + super().__init__(0, registers, dtMemory, cmdMemory) + + +if __name__ == "__main__": + cpu = CPU(registers=64, dtMemory=1024, cmdMemory=1024) + program.Operand(["r0"], cpu)[0].set(2) + # f = open('Example/加法器.txt') + # code = f.read().split('\n') + # cpu.loadProgram(code) + cpu.write_to_memory(0, 1) + cpu.write_to_memory(1, 1) + # cpu.run() + print(cpu.registers[0]) diff --git a/Device/Disk.py b/Device/Disk.py new file mode 100644 index 0000000..994c2f1 --- /dev/null +++ b/Device/Disk.py @@ -0,0 +1,41 @@ +import json +import sqlite3 +from Memory import Memory + + +class Storage(Memory): + def __init__(self, size): + super().__init__(1) + self.size = size + self.conn = sqlite3.connect("Storage.se") + self.create_table() + + def create_table(self): + cursor = self.conn.cursor() + cursor.execute( + """CREATE TABLE IF NOT EXISTS KeyValue ( + key INTEGER PRIMARY KEY, + value TEXT + )""" + ) + self.conn.commit() + + def __setitem__(self, key, value): + cursor = self.conn.cursor() + value = json.dumps([value]) + cursor.execute( + """ + INSERT INTO KeyValue (key, value) VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value + """, + (key, value), + ) + self.conn.commit() + + def __getitem__(self, key): + cursor = self.conn.cursor() + cursor.execute("SELECT value FROM KeyValue WHERE key = ?", (key,)) + result = cursor.fetchone() + if result: + return json.loads(result[0])[0] # 返回 value + return 0 diff --git a/Device/QTScreen.py b/Device/QTScreen.py new file mode 100644 index 0000000..a46def0 --- /dev/null +++ b/Device/QTScreen.py @@ -0,0 +1,71 @@ +import tkinter + + +class DynamicPixelDisplay: + def __init__(self, width, height, pixel_size): + self.CANVAS_WIDTH = width + self.CANVAS_HEIGHT = height + self.PIXEL_SIZE = pixel_size + + # 创建主窗口 + self.root = tkinter.Tk() + self.root.title("Dynamic Pixel Display") + + # 创建画布 + self.canvas = tkinter.Canvas( + self.root, width=self.CANVAS_WIDTH, height=self.CANVAS_HEIGHT, bg="white" + ) + self.canvas.pack() + + # 创建一个二维列表来表示像素颜色,默认为白色 + self.pixels = [] + for y in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE): + row = [] + for x in range(self.CANVAS_WIDTH // self.PIXEL_SIZE): + rect = self.canvas.create_rectangle( + x * self.PIXEL_SIZE, + y * self.PIXEL_SIZE, + (x + 1) * self.PIXEL_SIZE, + (y + 1) * self.PIXEL_SIZE, + fill="white", + ) + row.append(rect) + self.pixels.append(row) + + # 设置初始屏幕内容 + self.initial_screen_content = [] + for _ in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE): + row = ["white"] * (self.CANVAS_WIDTH // self.PIXEL_SIZE) + self.initial_screen_content.append(row) + + def run(self): + self.root.mainloop() + + # 修改像素颜色的方法 + def set_pixel_color(self, x, y, color): + if ( + 0 <= x < self.CANVAS_WIDTH // self.PIXEL_SIZE + and 0 <= y < self.CANVAS_HEIGHT // self.PIXEL_SIZE + ): + self.canvas.itemconfig(self.pixels[y][x], fill=color) + + # 清空屏幕的方法 + def clear_screen(self): + for y in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE): + for x in range(self.CANVAS_WIDTH // self.PIXEL_SIZE): + self.set_pixel_color(x, y, "white") + + # 用二维数组更新屏幕内容的方法 + def update_screen(self, screen_content): + self.clear_screen() # 首先清空屏幕 + # 根据二维数组的值更新屏幕内容 + for y in range(len(screen_content)): + for x in range(len(screen_content[0])): + color = screen_content[y][x] + if isinstance(color, str): + self.set_pixel_color(x, y, color) + + +# 实例化DynamicPixelDisplay类 +display = DynamicPixelDisplay(400, 400, 10) +display.run() diff --git a/Example/加法器.txt b/Example/加法器.txt new file mode 100644 index 0000000..e1c0652 --- /dev/null +++ b/Example/加法器.txt @@ -0,0 +1,5 @@ +ldr 0 0 +ldr 1 1 +add 0 1 +prtr 0 +halt \ No newline at end of file diff --git a/Memory.py b/Memory.py new file mode 100644 index 0000000..0234a68 --- /dev/null +++ b/Memory.py @@ -0,0 +1,53 @@ +from error import * + + +class Memory: + def __init__(self, size): + self.data = [0] * size + self.size = size + self.name = "内存" + + def init(self): + print("重置" + self.name) + self.data = [0] * self.size + + def __getitem__(self, key: int): + print("读取位于", key, "的" + self.name) + if key > self.size: + raise AddressError + return self.data[key] + + def write(self, index: int, val: int): + if index > self.size: + raise AddressError() + self.data[index] = val + + def __len__(self): + return self.size + + def __str__(self): + return str(self.data) + + def __setitem__(self, k, v): + print("写入位于", k, "的" + self.name) + + if isinstance(k, slice): + if not k.start: + start = 0 + else: + start = k.start + if not k.stop: + stop = self.size - 1 + else: + stop = k.stop + + if start > self.size or stop > self.size: + raise AddressError + + self.data[start:stop] = v + return + + if k > self.size: + raise AddressError + self.data[k] = v + return diff --git a/error.py b/error.py new file mode 100644 index 0000000..bca8fb4 --- /dev/null +++ b/error.py @@ -0,0 +1,2 @@ +class AddressError(Exception): + pass diff --git a/program.py b/program.py new file mode 100644 index 0000000..0602733 --- /dev/null +++ b/program.py @@ -0,0 +1,73 @@ +from error import * + + +class Program: + def __init__(self, code: str): + self.codelist = code.split("\n") + + +class BaseArg: + def __init__(self, tp, val): + self.type = tp + self._val = val + + def isRegister(self): + return self.type == "reg" + + def isMemory(self): + return self.type == "mem" + + def isImmediate(self): + return self.type == "int" + + +class RegisterArg(BaseArg): + def __init__(self, n, cpu): + super().__init__("reg", n) + if n > len(cpu.registers): + raise AddressError("寄存器不存在:" + str(n)) + self.cpu = cpu + + def set(self, val: int): + self.cpu.registers.write(self._val, val) + + def get(self): + return self.cpu.registers[self._val] + + +class MemoryArg(BaseArg): + def __init__(self, n, cpu): + super().__init__("mem", n) + if n > len(cpu.dtMemory): + raise AddressError("内存地址不存在:" + str(n)) + self.cpu = cpu + + def set(self, val: int): + self.cpu.dtMemory.write(self._val, val) + + def get(self): + return self.cpu.dtMemory[self._val] + + +class ImmediateArg(BaseArg): + def __init__(self, val): + super().__init__("int", val) + + def get(self): + return self._val + + +class Operand: + def __init__(self, operList, cpu): + self.List = [] + for i in operList: + if i[0] == "r": + self.List.append(RegisterArg(int(i[1:]), cpu)) + elif i[0:1] == "0x": + index = int(i, 16) + self.List.append(MemoryArg(index, cpu)) + elif i.isdigit(): + self.List.append(ImmediateArg(int(i))) + + def __getitem__(self, key: int): + return self.List[key]