import asyncio
import copy
import bisect
from collections import defaultdict
class BankSystem:
def __init__(self):
self.accounts = {} # ← 主要 data store
self.payment_counter = 0 # ← L3 用
self.payment = {} # ← L3 用
self.merged_accounts = {} # ← L4 用
self.account_locks = defaultdict(asyncio.Lock) # ← L5-6 用
def create_account(self, timestamp, account_id):
self._process_cashbacks(timestamp) # ← 固定第一行
if account_id in self.accounts:
return False
self.accounts[account_id] = {"balance": 0, "outgoing": 0, "history": [(timestamp, 0)]}
return True
def deposit(self, timestamp, account_id, amount):
self._process_cashbacks(timestamp)
if account_id not in self.accounts:
return None
acc = self.accounts[account_id] # ← 攞短名
acc["balance"] += amount # ← 做嘢
acc["history"].append((timestamp, acc["balance"])) # ← 記 history
return acc["balance"]
def transfer(self, timestamp, source_id, target_id, amount):
self._process_cashbacks(timestamp)
if source_id not in self.accounts or target_id not in self.accounts:
return None
if source_id == target_id:
return None
s_acc = self.accounts[source_id]
t_acc = self.accounts[target_id]
if s_acc["balance"] < amount:
return None
s_acc["balance"] -= amount
s_acc["outgoing"] += amount
s_acc["history"].append((timestamp, s_acc["balance"]))
t_acc["balance"] += amount
t_acc["history"].append((timestamp, t_acc["balance"]))
return s_acc["balance"]
d["balance"] 唔係 d.balanced[account_id] 用 variable,d["account_id"] 搵 literal stringacc = self.accounts[id] 改 acc 會改原本
def top_spenders(self, timestamp, n):
self._process_cashbacks(timestamp)
sorted_items = dict(sorted(
self.accounts.items(),
key=lambda x: (-x[1]["outgoing"], x[0]) # ← desc by value, asc by name
))
result = []
for account_id, acc in sorted_items.items():
result.append(f"{account_id}({acc['outgoing']})")
return result[:n] # ← [:n] 自動處理 n 太大
x[0] = key("alice"),x[1] = value(成個 dict)f"{name}({value})" → "alice(500)"
所有題都用。唔會自動到期,每次有人 call method 帶 timestamp 入嚟先 check。
def _process_cashbacks(self, timestamp):
for pid, p in self.payment.items():
if not p["received"] and timestamp >= p["cashback_time"]:
self.accounts[p["account_id"]]["balance"] += p["cashback_amount"]
p["received"] = True
def pay(self, timestamp, account_id, amount):
self._process_cashbacks(timestamp)
if account_id not in self.accounts: return None
acc = self.accounts[account_id]
if acc["balance"] < amount: return None
acc["balance"] -= amount
acc["outgoing"] += amount
acc["history"].append((timestamp, acc["balance"]))
cashback = amount * 2 // 100 # ← 計算 scheduled amount
cashback_time = timestamp + 86400000 # ← 計算 deadline
self.payment_counter += 1
pid = f"payment{self.payment_counter}"
self.payment[pid] = {"account_id": account_id, "cashback_amount": cashback, "cashback_time": cashback_time, "received": False}
return pid
def get_payment_status(self, timestamp, account_id, payment_id):
self._process_cashbacks(timestamp)
if account_id not in self.accounts: return None
if payment_id not in self.payment: return None
if self.payment[payment_id]["account_id"] != account_id: return None
if self.payment[payment_id]["received"]:
return "CASHBACK_RECEIVED"
return "IN_PROGRESS"
def get_balance(self, timestamp, account_id, time_at):
self._process_cashbacks(timestamp)
if account_id not in self.accounts: return None
acc = self.accounts[account_id]
for ts, val in reversed(acc["history"]): # ← 由尾搵返去
if ts <= time_at:
return val
def merge_accounts(self, timestamp, account_id_1, account_id_2):
self._process_cashbacks(timestamp)
if account_id_1 not in self.accounts or account_id_2 not in self.accounts: return False
if account_id_1 == account_id_2: return False
acc_1 = self.accounts[account_id_1]
acc_2 = self.accounts[account_id_2]
acc_1["balance"] += acc_2["balance"] # ← 合併 data
acc_1["outgoing"] += acc_2["outgoing"]
acc_1["history"].append((timestamp, acc_1["balance"]))
self.merged_accounts[account_id_2] = account_id_1
del self.accounts[account_id_2]
return True
import copy
snapshot = copy.deepcopy(self.accounts) # ← 完整複製
# 改 snapshot 唔影響原本
三步:1. 定義 execute_op 2. gather 同時跑 3. return list(results)
async def process_batch(self, timestamp, operations):
self._process_cashbacks(timestamp)
async def execute_op(op):
aid = op.get("account_id") or op.get("source_id")
lock = self.account_locks[aid]
async with lock:
# -------- 以下因應題目改 --------
if op["type"] == "deposit":
return self.deposit(timestamp, op["account_id"], op["amount"])
elif op["type"] == "transfer":
return self.transfer(timestamp, op["source_id"], op["target_id"], op["amount"])
elif op["type"] == "pay":
return self.pay(timestamp, op["account_id"], op["amount"])
# -------- 以上因應題目改 --------
results = await asyncio.gather(*[execute_op(op) for op in operations])
return list(results)
同 L5 一樣,加 Semaphore。Lock 包住改 data,Sem 包住外部 call。唔好 return 太早。
async def process_external_transfers(self, timestamp, transfers, max_concurrent):
self._process_cashbacks(timestamp)
sem = asyncio.Semaphore(max_concurrent)
async def do_transfer(t):
aid = t["account_id"]
lock = self.account_locks[aid]
async with lock: # ← 鎖住 account
# -------- 以下因應題目改(check + 扣錢) --------
if aid not in self.accounts:
return False
if self.accounts[aid]["balance"] < t["amount"]:
return False
self.accounts[aid]["balance"] -= t["amount"]
self.accounts[aid]["outgoing"] += t["amount"]
self.accounts[aid]["history"].append((timestamp, self.accounts[aid]["balance"]))
# -------- 以上因應題目改 --------
async with sem: # ← 限制同時數量
await asyncio.sleep(0.01) # ← 模擬外部 call
return True
results = await asyncio.gather(*[do_transfer(t) for t in transfers])
return list(results)
_init_ 一個底線 → ✅ __init__ 兩個底線def method(account_id): 漏 self → ✅ def method(self, account_id):d.balance dot → ✅ d["balance"] bracketaccounts.append(acc) → ✅ accounts[key] = acc dict 用 keybalance <= amount return None → ✅ balance < amount 等於夠錢要畀轉for x in dict: x.key → ✅ x 已經係 key stringpayment = d[pid]: 多咗冒號 → ✅ 唔係 function 唔使冒號return IN_PROGRESS → ✅ return "IN_PROGRESS" 要引號acc.amount = amount 覆蓋 → ✅ acc["balance"] += amount 累加copy_ref = original 同一個 object → ✅ copy.deepcopy(original)async with lock: return ... 之後嘅 code 行唔到 → ✅ 唔好 return 太早,lock 同 sem 分開
| Error | 意思 | 點修 |
|---|---|---|
| TypeError: takes N args but M given | param 數量錯 | check 有冇 self / 有冇漏 timestamp |
| AttributeError: 'dict' has no attribute 'X' | 用咗 dot access dict | 改做 d["X"] |
| KeyError: 'X' | dict 冇呢個 key | 先 check if key in d |
| AssertionError: None is not true | function 冇 return | 加 return True/False |
| NameError: name 'X' not defined | variable 未定義 | check 串字 / scope |
口訣:1. 睇 Error 類型 2. 睇 > 嗰行(test 點 call 你) 3. 對比你嘅 code
| 要做咩 | Code |
|---|---|
| 建空 dict | d = {} |
| 加/改 | d[key] = value |
| 刪 | del d[key] |
| check 存在 | if key in d: |
| 攞 value(safe) | d.get(key) |
| loop dict | for k, v in d.items(): |
| 建空 list | lst = [] |
| 加去尾 | lst.append(x) |
| 頭 n 個 | lst[:n] |
| format string | f"{name}({value})" |
| join list | ", ".join(lst) |
| sort desc+asc | sorted(items, key=lambda x: (-x[1], x[0])) |
| deep copy | copy.deepcopy(data) |
| reversed loop | for ts, val in reversed(lst): |
| auto ID | self.counter += 1; id = f"item{self.counter}" |
| round down | amount * 2 // 100 |
| 同時跑 | await asyncio.gather(*[fn(x) for x in items]) |
| 鎖住 | async with lock: |
| 限制數量 | sem = asyncio.Semaphore(n); async with sem: |