实现基础

This commit is contained in:
root 2024-04-29 16:00:03 +08:00
parent 67d4309882
commit aed1e3f6f2
12 changed files with 457 additions and 0 deletions

10
ASM/Jump.py Normal file
View File

@ -0,0 +1,10 @@
CMDs = ["jmp", "jmp_if"]
def jmp(self, operand):
self.pc = int(operand[0])
def jmp_if(self, operand):
if int(operand[1]) == 1:
self.pc = int(operand[0])

11
ASM/Logical.py Normal file
View File

@ -0,0 +1,11 @@
CMDs = ["eq"] # , 'en', 'gt', 'lt', 'ge', 'le']
def eq(self, operand):
# 是否等于
a = self.registers[int(operand[0])]
b = self.registers[int(operand[1])]
if a == b:
self.registers[int(operand[2])] = 1
else:
self.registers[int(operand[2])] = 0

21
ASM/Memory.py Normal file
View File

@ -0,0 +1,21 @@
# pylint:disable=W0622
CMDs = ["ldr", "str"]
def ldr(self, operand):
print("从内存的", operand[1], "处加载至寄存器", operand[0])
if int(operand[1]) < len(self.dtMemory):
value = self.dtMemory[int(operand[1])]
self.registers[int(operand[0])] = value
else:
print("ERROR: Attempted to load from invalid memory address.")
def str(self, operand):
# 往内存写入数据
if int(operand[1]) < len(self.dtMemory):
value = self.registers[int(operand[0])]
print(value)
self.dtMemory[int(operand[1])] = value
else:
print("ERROR: Attempted to store to invalid memory address.")

20
ASM/count.py Normal file
View File

@ -0,0 +1,20 @@
CMDs = ["add", "sub", "mul", "div"]
def add(self, operand):
self.registers[int(operand[0])] += self.registers[int(operand[1])]
def sub(self, operand):
self.registers[int(operand[0])] -= self.registers[int(operand[1])]
def mul(self, operand):
self.registers[int(operand[0])] *= self.registers[int(operand[1])]
def div(self, operand):
if self.registers[int(operand[1])] != 0:
self.registers[int(operand[0])] //= self.registers[int(operand[1])]
else:
raise ZeroDivisionError()

19
ASM/debug.py Normal file
View File

@ -0,0 +1,19 @@
CMDs = ["prt", "prtr", "prtm"]
def prt(self, operand):
print(operand[0])
def prtr(self, operand):
if len(operand) > 0:
print(self.registers[int(operand[0])], end="")
else:
print(self.registers, end="")
def prtm(self, operand):
if len(operand) > 0:
print(self.dtMemory[int(operand[0])], end="")
else:
print(self.dtMemory, end="")

131
CPU.py Normal file
View File

@ -0,0 +1,131 @@
import queue
from Memory import Memory
import pathlib
import program
import importlib
class Core:
def __init__(self, id, registers: int, dtMemory: int, cmdMemory: int):
self.id = id
# 创建存储器
self.registers = Memory(registers)
self.dtMemory = Memory(dtMemory)
self.cmdMemory = Memory(cmdMemory)
# 设置存储器
self.registers.name = f"寄存器 - Core{id}"
self.dtMemory.name = f"数据内存 - Core{id}"
self.cmdMemory.name = f"程序内存 - Core{id}"
self.pc = 0 # 程序计数器
self.stack = queue.LifoQueue(maxsize=10 * self.registers.size) # 栈
self.ASMModule = []
self.ASMs = {}
for file in pathlib.Path("ASM").iterdir():
if file.is_dir():
continue
module = "ASM." + file.name.split(".")[0]
module = importlib.import_module(module)
for cmd in module.CMDs:
self.ASMs[cmd] = module
def loadProgram(self, code):
# 加载程序至cmdMemroy
print("加载长度为", len(code), "的程序")
self.cmdMemory[: len(code)] = code
def fetchInstruction(self):
# 取指令
if self.pc < len(self.cmdMemory):
instruction = self.cmdMemory[self.pc]
self.pc += 1
return instruction
else:
return None # 返回None表示程序已经执行完毕
def init(self):
# 初始化内存和寄存器
self.pc = 0
self.dtMemory.init()
self.registers.init()
def executeInstruction(self, instruction):
# 执行指令
if instruction is None:
return False # 程序已执行完毕返回False
lists = instruction.split(" ")
# 解析指令
opcode = lists[0] # 操作码
operand = lists[1:] # 操作数
if opcode in self.ASMs:
ins = getattr(self.ASMs[opcode], opcode)
ins(self, operand)
if opcode == "prt":
# 调试函数,输出文字
print(operand[0])
elif opcode == "halt": # HALT
# 结束
return False
return True
def write_to_memory(self, address, data):
# 往内存写入数据
self.dtMemory[address] = data
def run(self):
# 开始执行
while True:
instruction = self.fetchInstruction()
if not self.executeInstruction(instruction):
break
class MultiCoreCPU:
def __init__(self, num_cores, registers, dt_memory, cmd_memory_size=-1):
self.cores = []
self.programs = queue.Queue(maxsize=num_cores)
if cmd_memory_size <= 0:
cmd_memory_size = dt_memory.size
for i in range(num_cores):
cmdMem = Memory(cmd_memory_size)
core = Core(i, registers, dt_memory, cmdMem)
self.cores.append(core)
def load_program(self, program):
self.programs.put_nowait(program)
def run(self):
oldCore = -1
while not self.programs.empty():
oldCore += 1
self.cores[oldCore].loadProgram(self.programs.get())
def halt(self):
for core in self.cores:
core.is_running = False
class CPU(Core):
def __init__(self, registers, dtMemory, cmdMemory):
super().__init__(0, registers, dtMemory, cmdMemory)
if __name__ == "__main__":
cpu = CPU(registers=64, dtMemory=1024, cmdMemory=1024)
program.Operand(["r0"], cpu)[0].set(2)
# f = open('Example/加法器.txt')
# code = f.read().split('\n')
# cpu.loadProgram(code)
cpu.write_to_memory(0, 1)
cpu.write_to_memory(1, 1)
# cpu.run()
print(cpu.registers[0])

41
Device/Disk.py Normal file
View File

@ -0,0 +1,41 @@
import json
import sqlite3
from Memory import Memory
class Storage(Memory):
def __init__(self, size):
super().__init__(1)
self.size = size
self.conn = sqlite3.connect("Storage.se")
self.create_table()
def create_table(self):
cursor = self.conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS KeyValue (
key INTEGER PRIMARY KEY,
value TEXT
)"""
)
self.conn.commit()
def __setitem__(self, key, value):
cursor = self.conn.cursor()
value = json.dumps([value])
cursor.execute(
"""
INSERT INTO KeyValue (key, value) VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value
""",
(key, value),
)
self.conn.commit()
def __getitem__(self, key):
cursor = self.conn.cursor()
cursor.execute("SELECT value FROM KeyValue WHERE key = ?", (key,))
result = cursor.fetchone()
if result:
return json.loads(result[0])[0] # 返回 value
return 0

71
Device/QTScreen.py Normal file
View File

@ -0,0 +1,71 @@
import tkinter
class DynamicPixelDisplay:
def __init__(self, width, height, pixel_size):
self.CANVAS_WIDTH = width
self.CANVAS_HEIGHT = height
self.PIXEL_SIZE = pixel_size
# 创建主窗口
self.root = tkinter.Tk()
self.root.title("Dynamic Pixel Display")
# 创建画布
self.canvas = tkinter.Canvas(
self.root, width=self.CANVAS_WIDTH, height=self.CANVAS_HEIGHT, bg="white"
)
self.canvas.pack()
# 创建一个二维列表来表示像素颜色,默认为白色
self.pixels = []
for y in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE):
row = []
for x in range(self.CANVAS_WIDTH // self.PIXEL_SIZE):
rect = self.canvas.create_rectangle(
x * self.PIXEL_SIZE,
y * self.PIXEL_SIZE,
(x + 1) * self.PIXEL_SIZE,
(y + 1) * self.PIXEL_SIZE,
fill="white",
)
row.append(rect)
self.pixels.append(row)
# 设置初始屏幕内容
self.initial_screen_content = []
for _ in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE):
row = ["white"] * (self.CANVAS_WIDTH // self.PIXEL_SIZE)
self.initial_screen_content.append(row)
def run(self):
self.root.mainloop()
# 修改像素颜色的方法
def set_pixel_color(self, x, y, color):
if (
0 <= x < self.CANVAS_WIDTH // self.PIXEL_SIZE
and 0 <= y < self.CANVAS_HEIGHT // self.PIXEL_SIZE
):
self.canvas.itemconfig(self.pixels[y][x], fill=color)
# 清空屏幕的方法
def clear_screen(self):
for y in range(self.CANVAS_HEIGHT // self.PIXEL_SIZE):
for x in range(self.CANVAS_WIDTH // self.PIXEL_SIZE):
self.set_pixel_color(x, y, "white")
# 用二维数组更新屏幕内容的方法
def update_screen(self, screen_content):
self.clear_screen() # 首先清空屏幕
# 根据二维数组的值更新屏幕内容
for y in range(len(screen_content)):
for x in range(len(screen_content[0])):
color = screen_content[y][x]
if isinstance(color, str):
self.set_pixel_color(x, y, color)
# 实例化DynamicPixelDisplay类
display = DynamicPixelDisplay(400, 400, 10)
display.run()

5
Example/加法器.txt Normal file
View File

@ -0,0 +1,5 @@
ldr 0 0
ldr 1 1
add 0 1
prtr 0
halt

53
Memory.py Normal file
View File

@ -0,0 +1,53 @@
from error import *
class Memory:
def __init__(self, size):
self.data = [0] * size
self.size = size
self.name = "内存"
def init(self):
print("重置" + self.name)
self.data = [0] * self.size
def __getitem__(self, key: int):
print("读取位于", key, "" + self.name)
if key > self.size:
raise AddressError
return self.data[key]
def write(self, index: int, val: int):
if index > self.size:
raise AddressError()
self.data[index] = val
def __len__(self):
return self.size
def __str__(self):
return str(self.data)
def __setitem__(self, k, v):
print("写入位于", k, "" + self.name)
if isinstance(k, slice):
if not k.start:
start = 0
else:
start = k.start
if not k.stop:
stop = self.size - 1
else:
stop = k.stop
if start > self.size or stop > self.size:
raise AddressError
self.data[start:stop] = v
return
if k > self.size:
raise AddressError
self.data[k] = v
return

2
error.py Normal file
View File

@ -0,0 +1,2 @@
class AddressError(Exception):
pass

73
program.py Normal file
View File

@ -0,0 +1,73 @@
from error import *
class Program:
def __init__(self, code: str):
self.codelist = code.split("\n")
class BaseArg:
def __init__(self, tp, val):
self.type = tp
self._val = val
def isRegister(self):
return self.type == "reg"
def isMemory(self):
return self.type == "mem"
def isImmediate(self):
return self.type == "int"
class RegisterArg(BaseArg):
def __init__(self, n, cpu):
super().__init__("reg", n)
if n > len(cpu.registers):
raise AddressError("寄存器不存在:" + str(n))
self.cpu = cpu
def set(self, val: int):
self.cpu.registers.write(self._val, val)
def get(self):
return self.cpu.registers[self._val]
class MemoryArg(BaseArg):
def __init__(self, n, cpu):
super().__init__("mem", n)
if n > len(cpu.dtMemory):
raise AddressError("内存地址不存在:" + str(n))
self.cpu = cpu
def set(self, val: int):
self.cpu.dtMemory.write(self._val, val)
def get(self):
return self.cpu.dtMemory[self._val]
class ImmediateArg(BaseArg):
def __init__(self, val):
super().__init__("int", val)
def get(self):
return self._val
class Operand:
def __init__(self, operList, cpu):
self.List = []
for i in operList:
if i[0] == "r":
self.List.append(RegisterArg(int(i[1:]), cpu))
elif i[0:1] == "0x":
index = int(i, 16)
self.List.append(MemoryArg(index, cpu))
elif i.isdigit():
self.List.append(ImmediateArg(int(i)))
def __getitem__(self, key: int):
return self.List[key]