1193 lines
48 KiB
Python
1193 lines
48 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
整合 NFC 生產工具與腳本執行監控
|
||
Flask + Flask-SocketIO
|
||
"""
|
||
|
||
# 必須在導入其他模組之前執行 eventlet.monkey_patch()
|
||
import eventlet
|
||
eventlet.monkey_patch()
|
||
|
||
import os
|
||
import subprocess
|
||
import threading
|
||
import queue
|
||
import re
|
||
from datetime import datetime
|
||
from flask import Flask, render_template, request, jsonify
|
||
from flask_socketio import SocketIO, emit
|
||
from os.path import dirname, basename, abspath
|
||
import json
|
||
import shutil
|
||
import urllib.parse
|
||
import requests
|
||
|
||
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = 'nfc_web_controller_secret_key'
|
||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||
|
||
exe_dir = 'linux64_release'
|
||
CONFIG_FILE = 'config.json'
|
||
|
||
def load_config():
|
||
"""載入設定檔"""
|
||
if os.path.exists(CONFIG_FILE):
|
||
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {'api_url': ''}
|
||
|
||
def save_config(config):
|
||
"""儲存設定檔"""
|
||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(config, f, ensure_ascii=False, indent=2)
|
||
|
||
# 載入設定
|
||
config = load_config()
|
||
|
||
def get_keys_path():
|
||
return os.path.join(exe_dir, 'keys.txt')
|
||
def get_urls_path():
|
||
return os.path.join(exe_dir, 'urls.txt')
|
||
def get_default_exe_path():
|
||
return os.path.join(exe_dir, 'nt4h_c_example')
|
||
|
||
nt4h_exe_path = get_default_exe_path()
|
||
@app.route('/')
|
||
def index():
|
||
return render_template('index.html')
|
||
|
||
@app.route('/api/set_exe_dir', methods=['POST'])
|
||
def api_set_exe_dir():
|
||
global exe_dir, nt4h_exe_path
|
||
data = request.get_json()
|
||
new_dir = data.get('exe_dir')
|
||
if not new_dir or not os.path.isdir(new_dir):
|
||
return jsonify({'success': False, 'message': '資料夾不存在'}), 400
|
||
exe_dir = new_dir
|
||
# 跟著自動切換執行檔路徑
|
||
nt4h_exe_path = get_default_exe_path()
|
||
return jsonify({'success': True, 'exe_dir': exe_dir, 'exe_path': nt4h_exe_path})
|
||
|
||
@app.route('/api/keys')
|
||
def api_keys():
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
return jsonify({'success': False, 'message': '找不到 keys.txt'}), 404
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
return jsonify({'success': True, 'keys': keys})
|
||
|
||
@app.route('/api/urls')
|
||
def api_urls():
|
||
urls_path = get_urls_path()
|
||
if not os.path.exists(urls_path):
|
||
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
|
||
with open(urls_path, 'r', encoding='utf-8') as f:
|
||
urls = [line.strip() for line in f if line.strip()]
|
||
return jsonify({'success': True, 'urls': urls})
|
||
|
||
@app.route('/api/get_url_by_index', methods=['POST'])
|
||
def api_get_url_by_index():
|
||
"""根據索引獲取特定 URL"""
|
||
data = request.get_json()
|
||
index = data.get('index')
|
||
if not index or not index.isdigit():
|
||
return jsonify({'success': False, 'message': '無效的索引'}), 400
|
||
|
||
urls_path = get_urls_path()
|
||
if not os.path.exists(urls_path):
|
||
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
|
||
|
||
with open(urls_path, 'r', encoding='utf-8') as f:
|
||
urls = [line.strip() for line in f if line.strip()]
|
||
|
||
idx = int(index) - 1 # 轉換為 0-based 索引
|
||
if idx < 0 or idx >= len(urls):
|
||
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
|
||
|
||
return jsonify({'success': True, 'url': urls[idx], 'index': index})
|
||
|
||
@app.route('/api/update_url_by_index', methods=['POST'])
|
||
def api_update_url_by_index():
|
||
"""更新特定索引的 URL"""
|
||
data = request.get_json()
|
||
index = data.get('index')
|
||
new_url = data.get('url', '').strip()
|
||
|
||
if not index or not index.isdigit():
|
||
return jsonify({'success': False, 'message': '無效的索引'}), 400
|
||
|
||
if not new_url:
|
||
return jsonify({'success': False, 'message': 'URL 不得為空'}), 400
|
||
|
||
urls_path = get_urls_path()
|
||
if not os.path.exists(urls_path):
|
||
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
|
||
|
||
# 讀取所有 URL
|
||
with open(urls_path, 'r', encoding='utf-8') as f:
|
||
urls = [line.strip() for line in f if line.strip()]
|
||
|
||
idx = int(index) - 1 # 轉換為 0-based 索引
|
||
if idx < 0 or idx >= len(urls):
|
||
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
|
||
|
||
# 更新 URL
|
||
urls[idx] = new_url
|
||
|
||
# 寫回檔案
|
||
try:
|
||
with open(urls_path, 'w', encoding='utf-8') as f:
|
||
for url in urls:
|
||
f.write(url + '\n')
|
||
return jsonify({'success': True, 'message': 'URL 更新成功'})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
|
||
|
||
@app.route('/api/add_url', methods=['POST'])
|
||
def api_add_url():
|
||
"""添加新的 URL"""
|
||
data = request.get_json()
|
||
new_url = data.get('url', '').strip()
|
||
|
||
if not new_url:
|
||
return jsonify({'success': False, 'message': 'URL 不得為空'}), 400
|
||
|
||
urls_path = get_urls_path()
|
||
if not os.path.exists(urls_path):
|
||
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
|
||
|
||
# 讀取現有 URL
|
||
with open(urls_path, 'r', encoding='utf-8') as f:
|
||
urls = [line.strip() for line in f if line.strip()]
|
||
|
||
# 添加新 URL
|
||
urls.append(new_url)
|
||
|
||
# 寫回檔案
|
||
try:
|
||
with open(urls_path, 'w', encoding='utf-8') as f:
|
||
for url in urls:
|
||
f.write(url + '\n')
|
||
return jsonify({'success': True, 'message': 'URL 添加成功', 'new_index': len(urls)})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
|
||
|
||
@app.route('/api/delete_url_by_index', methods=['POST'])
|
||
def api_delete_url_by_index():
|
||
"""刪除特定索引的 URL"""
|
||
data = request.get_json()
|
||
index = data.get('index')
|
||
|
||
if not index or not index.isdigit():
|
||
return jsonify({'success': False, 'message': '無效的索引'}), 400
|
||
|
||
urls_path = get_urls_path()
|
||
if not os.path.exists(urls_path):
|
||
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
|
||
|
||
# 讀取所有 URL
|
||
with open(urls_path, 'r', encoding='utf-8') as f:
|
||
urls = [line.strip() for line in f if line.strip()]
|
||
|
||
idx = int(index) - 1 # 轉換為 0-based 索引
|
||
if idx < 0 or idx >= len(urls):
|
||
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
|
||
|
||
# 刪除 URL
|
||
deleted_url = urls.pop(idx)
|
||
|
||
# 寫回檔案
|
||
try:
|
||
with open(urls_path, 'w', encoding='utf-8') as f:
|
||
for url in urls:
|
||
f.write(url + '\n')
|
||
return jsonify({'success': True, 'message': 'URL 刪除成功', 'deleted_url': deleted_url})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
|
||
|
||
@app.route('/api/save_urls_file', methods=['POST'])
|
||
def api_save_urls_file():
|
||
"""保存整個 URLs 文件內容"""
|
||
data = request.get_json()
|
||
urls_content = data.get('urls_content', '')
|
||
|
||
urls_path = get_urls_path()
|
||
|
||
try:
|
||
# 將內容按行分割,過濾空行
|
||
urls = [line.strip() for line in urls_content.split('\n') if line.strip()]
|
||
|
||
# 寫入檔案
|
||
with open(urls_path, 'w', encoding='utf-8') as f:
|
||
for url in urls:
|
||
f.write(url + '\n')
|
||
|
||
return jsonify({'success': True, 'message': 'URLs 文件保存成功', 'urls_count': len(urls)})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
|
||
|
||
@app.route('/api/set_exe_path', methods=['POST'])
|
||
def api_set_exe_path():
|
||
global nt4h_exe_path
|
||
data = request.get_json()
|
||
exe_path = data.get('exe_path')
|
||
if not exe_path or not os.path.isfile(exe_path):
|
||
return jsonify({'success': False, 'message': '檔案不存在'}), 400
|
||
nt4h_exe_path = exe_path
|
||
return jsonify({'success': True, 'exe_path': nt4h_exe_path})
|
||
|
||
@app.route('/api/set_api_url', methods=['POST'])
|
||
def api_set_api_url():
|
||
global config
|
||
data = request.get_json()
|
||
api_url = data.get('api_url')
|
||
if not api_url:
|
||
return jsonify({'success': False, 'message': '未提供 API URL'}), 400
|
||
config['api_url'] = api_url
|
||
save_config(config)
|
||
return jsonify({'success': True, 'api_url': api_url})
|
||
|
||
@app.route('/api/set_api_token', methods=['POST'])
|
||
def api_set_api_token():
|
||
global config
|
||
data = request.get_json()
|
||
api_token = data.get('api_token')
|
||
if not api_token:
|
||
return jsonify({'success': False, 'message': '未提供 API Token'}), 400
|
||
config['api_token'] = api_token
|
||
save_config(config)
|
||
return jsonify({'success': True, 'message': 'API Token 設定成功'})
|
||
|
||
@app.route('/api/get_api_token')
|
||
def api_get_api_token():
|
||
return jsonify({'api_token': config.get('api_token', '')})
|
||
|
||
@app.route('/api/get_api_url')
|
||
def api_get_api_url():
|
||
return jsonify({'api_url': config.get('api_url', '')})
|
||
|
||
@app.route('/api/get_name')
|
||
def api_get_name():
|
||
return jsonify({'name': config.get('name', '')})
|
||
|
||
@app.route('/api/set_name', methods=['POST'])
|
||
def api_set_name():
|
||
data = request.get_json()
|
||
name = data.get('name', '')
|
||
config['name'] = name
|
||
save_config(config)
|
||
return jsonify({'success': True, 'name': name})
|
||
|
||
@app.route('/api/get_names', methods=['GET'])
|
||
def api_get_names():
|
||
names = config.get('names', [])
|
||
current = config.get('name', '')
|
||
return jsonify({'names': names, 'current': current})
|
||
|
||
@app.route('/api/add_name', methods=['POST'])
|
||
def api_add_name():
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
if not name:
|
||
return jsonify({'success': False, 'message': '名稱不得為空'}), 400
|
||
names = config.get('names', [])
|
||
if name not in names:
|
||
names.append(name)
|
||
config['names'] = names
|
||
# 不自動設定為當前選擇,保持空白
|
||
save_config(config)
|
||
return jsonify({'success': True, 'names': names, 'current': ''})
|
||
|
||
@app.route('/api/delete_name', methods=['POST'])
|
||
def api_delete_name():
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
if not name:
|
||
return jsonify({'success': False, 'message': '名稱不得為空'}), 400
|
||
names = config.get('names', [])
|
||
if name in names:
|
||
names.remove(name)
|
||
config['names'] = names
|
||
# 如果刪除的是當前選中的名稱,清空當前選擇
|
||
if config.get('name') == name:
|
||
config['name'] = ''
|
||
save_config(config)
|
||
return jsonify({'success': True, 'names': names, 'current': config.get('name', '')})
|
||
else:
|
||
return jsonify({'success': False, 'message': '名稱不存在'}), 400
|
||
|
||
def run_and_emit(cmd, exe_dir, success_keywords=None, stop_on_success=False):
|
||
"""
|
||
執行指令並即時推送結果到前端。
|
||
若 success_keywords 有設定,遇到任一關鍵字就回傳 True(可用於 early stop)。
|
||
"""
|
||
socketio.emit('script_output', {'line': f'$ {" ".join(cmd)}'})
|
||
try:
|
||
import os
|
||
import select
|
||
import sys
|
||
|
||
# 設定環境變數確保即時輸出
|
||
env = os.environ.copy()
|
||
env['PYTHONUNBUFFERED'] = '1'
|
||
env['PYTHONIOENCODING'] = 'utf-8'
|
||
|
||
# 使用 pty 來獲得更真實的終端體驗
|
||
try:
|
||
import pty
|
||
master, slave = pty.openpty()
|
||
|
||
proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=slave, stderr=slave,
|
||
stdin=slave, env=env, preexec_fn=os.setsid)
|
||
os.close(slave)
|
||
|
||
output_lines = []
|
||
success = False
|
||
|
||
# 使用 select 來非阻塞讀取
|
||
while True:
|
||
try:
|
||
# 檢查進程是否還在運行
|
||
if proc.poll() is not None:
|
||
break
|
||
|
||
# 使用 select 等待數據
|
||
ready, _, _ = select.select([master], [], [], 0.1)
|
||
|
||
if ready:
|
||
data = os.read(master, 1024).decode('utf-8', errors='ignore')
|
||
if data:
|
||
# 按行分割並發送
|
||
lines = data.split('\n')
|
||
for i, line in enumerate(lines):
|
||
if i < len(lines) - 1: # 完整的行
|
||
clean_line = line.rstrip('\r')
|
||
if clean_line: # 只發送非空行
|
||
socketio.emit('script_output', {'line': clean_line})
|
||
output_lines.append(clean_line)
|
||
|
||
# 檢查成功關鍵字
|
||
if success_keywords and any(kw in clean_line for kw in success_keywords):
|
||
success = True
|
||
if stop_on_success:
|
||
proc.terminate()
|
||
break
|
||
else: # 最後一行可能不完整
|
||
if line.strip():
|
||
# 保存到緩衝區,等待更多數據
|
||
pass
|
||
|
||
if success and stop_on_success:
|
||
break
|
||
|
||
except OSError:
|
||
break
|
||
|
||
# 讀取剩餘的輸出
|
||
try:
|
||
remaining = os.read(master, 4096).decode('utf-8', errors='ignore')
|
||
if remaining:
|
||
for line in remaining.split('\n'):
|
||
clean_line = line.rstrip('\r')
|
||
if clean_line:
|
||
socketio.emit('script_output', {'line': clean_line})
|
||
output_lines.append(clean_line)
|
||
except OSError:
|
||
pass
|
||
|
||
os.close(master)
|
||
proc.wait()
|
||
|
||
except ImportError:
|
||
# 如果 pty 不可用,回退到標準方法
|
||
proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||
universal_newlines=True, bufsize=1, env=env)
|
||
output_lines = []
|
||
success = False
|
||
|
||
for line in iter(proc.stdout.readline, ''):
|
||
if line:
|
||
clean_line = line.rstrip('\r\n')
|
||
socketio.emit('script_output', {'line': clean_line})
|
||
output_lines.append(clean_line)
|
||
|
||
if success_keywords and any(kw in clean_line for kw in success_keywords):
|
||
success = True
|
||
if stop_on_success:
|
||
break
|
||
|
||
proc.wait()
|
||
|
||
return output_lines, success
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'執行指令時發生錯誤: {e}'})
|
||
return [], False
|
||
|
||
def find_valid_oldkey(keys, exe_file, exe_dir):
|
||
"""
|
||
依序 getuid --key <行號>,遇到成功就停下,回傳 (oldkey行號, output_lines)
|
||
"""
|
||
for idx in range(len(keys)):
|
||
key_no = str(idx+1)
|
||
cmd = [f'./{exe_file}', 'getuid', '--key', key_no]
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False)
|
||
if success:
|
||
socketio.emit('script_output', {'line': f'找到可用 oldkey:第{key_no}行'})
|
||
return key_no, output_lines
|
||
return None, []
|
||
|
||
@app.route('/api/run', methods=['POST'])
|
||
def api_run():
|
||
data = request.get_json()
|
||
key = data.get('key')
|
||
url = data.get('url')
|
||
name = data.get('name', '')
|
||
if not key:
|
||
return jsonify({'success': False, 'message': '未選擇 key'}), 400
|
||
|
||
# 儲存名稱到 config
|
||
if name:
|
||
config['name'] = name
|
||
save_config(config)
|
||
|
||
thread = threading.Thread(target=run_nfc_produce, args=(key, url, name))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始執行生產新SDM流程'})
|
||
|
||
def run_nfc_produce(newkey, url, name=''):
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
socketio.emit('script_output', {'line': '找不到 keys.txt,無法生產NFC'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir)
|
||
if not oldkey:
|
||
socketio.emit('script_output', {'line': '未找到可用的 oldkey,流程結束'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
|
||
# 執行 changekey
|
||
socketio.emit('script_output', {'line': '🔄 開始變更金鑰...'})
|
||
cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', newkey, '--old-key', oldkey, '--key-no', '0']
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False)
|
||
|
||
if not success:
|
||
socketio.emit('script_output', {'line': '❌ 變更金鑰失敗,流程結束'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
|
||
socketio.emit('script_output', {'line': '✅ 成功變更金鑰!'})
|
||
|
||
# 執行 setsdm
|
||
socketio.emit('script_output', {'line': '🔄 開始設定 SDM...'})
|
||
cmd = [f'./{exe_file}', 'setsdm', '--url', url, '--key', newkey]
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['快速 SDM 設定完成', 'SDM 設定完成'], stop_on_success=False)
|
||
|
||
if not success:
|
||
socketio.emit('script_output', {'line': '❌ 設定 SDM 失敗,流程結束'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
|
||
socketio.emit('script_output', {'line': '✅ 成功設定 SDM!'})
|
||
|
||
# 執行 verify
|
||
socketio.emit('script_output', {'line': '🔄 開始 CMAC 認證...'})
|
||
cmd = [f'./{exe_file}', 'verify', '--key', newkey]
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
|
||
# 解析 verify 結果
|
||
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, newkey, keys)
|
||
|
||
if uid and sdm_ctr and cmac:
|
||
socketio.emit('script_output', {'line': f'✅ CMAC 認證成功!'})
|
||
socketio.emit('script_output', {'line': f'UID: {uid}'})
|
||
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
|
||
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
|
||
|
||
# POST 到 API
|
||
post_to_api(uid, sdm_ctr, cmac, newkey, name)
|
||
else:
|
||
socketio.emit('script_output', {'line': '❌ CMAC 認證失敗,無法解析結果'})
|
||
|
||
socketio.emit('script_finished', {'message': '生產新SDM流程結束'})
|
||
|
||
@app.route('/api/read_nfc', methods=['POST'])
|
||
def api_read_nfc():
|
||
thread = threading.Thread(target=read_nfc)
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始批次讀取所有NFC'})
|
||
|
||
def read_nfc():
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
socketio.emit('script_output', {'line': '找不到 keys.txt,無法批次讀取NFC'})
|
||
socketio.emit('script_finished', {'message': '讀取結束'})
|
||
return
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
exe_dir_path = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
for idx in range(len(keys)):
|
||
key_no = str(idx+1)
|
||
cmd = [f'./{exe_file}', 'getuid', '--key', key_no]
|
||
output_lines, success = run_and_emit(cmd, exe_dir_path, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False)
|
||
if success:
|
||
socketio.emit('script_output', {'line': f'成功讀取到 UID,停止讀取'})
|
||
break
|
||
socketio.emit('script_finished', {'message': '讀取結束'})
|
||
|
||
@app.route('/api/verify', methods=['POST'])
|
||
def api_verify():
|
||
data = request.get_json()
|
||
key = data.get('key')
|
||
name = data.get('name', '')
|
||
if not key:
|
||
return jsonify({'success': False, 'message': '未選擇 key'}), 400
|
||
|
||
# 儲存名稱到 config
|
||
if name:
|
||
config['name'] = name
|
||
save_config(config)
|
||
|
||
thread = threading.Thread(target=verify_nfc, args=(key, name))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始 CMAC 認證流程'})
|
||
|
||
def parse_verify_output(output_lines, key_no, keys):
|
||
uid = None
|
||
sdm_ctr = None
|
||
cmac = None
|
||
for line in output_lines:
|
||
if 'UID:' in line:
|
||
uid = line.split('UID:')[1].strip()
|
||
elif 'SDM 讀取計數器:' in line:
|
||
sdm_ctr = line.split('SDM 讀取計數器:')[1].strip()
|
||
elif 'ASCII MAC 資料:' in line:
|
||
cmac = line.split('ASCII MAC 資料:')[1].strip()
|
||
key_value = keys[int(key_no)-1] if key_no and key_no.isdigit() and 0 < int(key_no) <= len(keys) else ''
|
||
return uid, sdm_ctr, cmac, key_value
|
||
|
||
def post_to_api(uid, sdm_ctr, cmac, key_index, name=''):
|
||
"""將 UID、SDM 計數器、CMAC 資料 POST 到指定的 API URL"""
|
||
api_url = config.get('api_url', '')
|
||
if not api_url:
|
||
socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL,跳過POST請求'})
|
||
return
|
||
|
||
# 檢查是否有 API Token
|
||
api_token = config.get('api_token', '')
|
||
if not api_token:
|
||
socketio.emit('script_output', {'line': '⚠️ 未設定 API Token,跳過POST請求'})
|
||
return
|
||
|
||
# 構建 POST 請求的 URL
|
||
post_url = f"{api_url}/nfc/insertForProduction"
|
||
|
||
# 準備 JSON 資料
|
||
json_data = {
|
||
'uid': uid,
|
||
'ctr': sdm_ctr,
|
||
'key': key_index
|
||
}
|
||
|
||
# 準備請求標頭,包含 API Token
|
||
headers = {
|
||
'Authorization': f'Bearer {api_token}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
socketio.emit('script_output', {'line': f'🔄 正在發送資料到 API: {post_url}'})
|
||
socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'})
|
||
socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'})
|
||
|
||
try:
|
||
# 發送 POST 請求,包含 API Token
|
||
response = requests.post(post_url, json=json_data, headers=headers, timeout=30)
|
||
|
||
socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'})
|
||
|
||
if response.status_code == 200:
|
||
socketio.emit('script_output', {'line': '✅ 資料成功發送到 API'})
|
||
try:
|
||
# 檢查回應內容是否為空
|
||
if response.text and response.text.strip():
|
||
response_data = response.json()
|
||
socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'})
|
||
except json.JSONDecodeError as e:
|
||
socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'})
|
||
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
|
||
elif response.status_code == 401:
|
||
socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
elif response.status_code == 500:
|
||
socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
|
||
except requests.exceptions.Timeout:
|
||
socketio.emit('script_output', {'line': '❌ API 請求超時'})
|
||
except requests.exceptions.ConnectionError:
|
||
socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'})
|
||
|
||
def verify_nfc(key_no, name=''):
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
socketio.emit('script_output', {'line': '找不到 keys.txt,無法認證'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
# 執行 CMAC 認證
|
||
cmd = [f'./{exe_file}', 'verify', '--key', key_no]
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False)
|
||
|
||
# 檢查本地認證結果
|
||
if success:
|
||
socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'})
|
||
|
||
# 解析結果
|
||
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys)
|
||
|
||
if uid and sdm_ctr and cmac:
|
||
socketio.emit('script_output', {'line': f'UID: {uid}'})
|
||
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
|
||
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
|
||
|
||
# 進行遠端認證
|
||
verify_remote(uid, sdm_ctr, cmac, key_no)
|
||
else:
|
||
socketio.emit('script_output', {'line': '❌ 無法解析認證結果'})
|
||
else:
|
||
# 檢查是否有 MAC 驗證失敗的錯誤碼
|
||
mac_failed = False
|
||
for line in output_lines:
|
||
if 'MAC 驗證失敗' in line and '0x' in line:
|
||
socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'})
|
||
mac_failed = True
|
||
break
|
||
|
||
if not mac_failed:
|
||
socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'})
|
||
|
||
socketio.emit('script_finished', {'message': 'CMAC 認證流程結束'})
|
||
|
||
def verify_remote(uid, sdm_ctr, cmac, key_no):
|
||
"""進行遠端 CMAC 認證"""
|
||
verify_url = f"https://verify.contree.app/verify?uid={uid}&ctr={sdm_ctr}&cmac={cmac}&key={key_no}"
|
||
|
||
socketio.emit('script_output', {'line': '🔄 開始遠端認證...'})
|
||
socketio.emit('script_output', {'line': f'認證 URL: {verify_url}'})
|
||
|
||
try:
|
||
# 發送 GET 請求到遠端認證服務
|
||
response = requests.get(verify_url, timeout=30)
|
||
|
||
socketio.emit('script_output', {'line': f'📡 遠端認證回應狀態碼: {response.status_code}'})
|
||
|
||
if response.status_code == 200:
|
||
try:
|
||
# 檢查回應內容是否為空
|
||
if response.text and response.text.strip():
|
||
response_data = response.json()
|
||
|
||
if response_data.get('success') == True:
|
||
socketio.emit('script_output', {'line': '✅ 線上認證成功!'})
|
||
socketio.emit('script_output', {'line': f'📄 認證訊息: {response_data.get("message", "")}'})
|
||
socketio.emit('script_output', {'line': f'📄 認證時間: {response_data.get("timestamp", "")}'})
|
||
|
||
# 顯示詳細資訊
|
||
details = response_data.get('details', {})
|
||
if details:
|
||
socketio.emit('script_output', {'line': f'📄 認證詳情:'})
|
||
socketio.emit('script_output', {'line': f' UID: {details.get("uid", "")}'})
|
||
socketio.emit('script_output', {'line': f' 計數器: {details.get("counter", "")}'})
|
||
socketio.emit('script_output', {'line': f' CMAC: {details.get("cmac", "")}'})
|
||
socketio.emit('script_output', {'line': f' 金鑰索引: {details.get("key_index", "")}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '❌ 線上認證失敗'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤訊息: {response_data.get("message", "")}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '📄 遠端認證回應: (空回應)'})
|
||
except json.JSONDecodeError as e:
|
||
socketio.emit('script_output', {'line': f'📄 遠端認證回應 (非JSON格式): {response.text}'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'📄 遠端認證回應解析錯誤: {str(e)}'})
|
||
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': f'❌ 遠端認證請求失敗,狀態碼: {response.status_code}'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
|
||
except requests.exceptions.Timeout:
|
||
socketio.emit('script_output', {'line': '❌ 遠端認證請求超時'})
|
||
except requests.exceptions.ConnectionError:
|
||
socketio.emit('script_output', {'line': '❌ 無法連接到遠端認證服務'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'❌ 遠端認證發生錯誤: {str(e)}'})
|
||
|
||
def post_to_verify_api(uid, name=''):
|
||
"""將 UID POST 到資料 API 進行檢驗"""
|
||
api_url = config.get('api_url', '')
|
||
if not api_url:
|
||
socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL,跳過POST請求'})
|
||
return
|
||
|
||
# 檢查是否有 API Token
|
||
api_token = config.get('api_token', '')
|
||
if not api_token:
|
||
socketio.emit('script_output', {'line': '⚠️ 未設定 API Token,跳過POST請求'})
|
||
return
|
||
|
||
# 構建 POST 請求的 URL
|
||
post_url = f"{api_url}/nfc/verifyForProduction"
|
||
|
||
# 準備 JSON 資料
|
||
json_data = {
|
||
'uid': uid
|
||
}
|
||
|
||
# 準備請求標頭,包含 API Token
|
||
headers = {
|
||
'Authorization': f'Bearer {api_token}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
socketio.emit('script_output', {'line': f'🔄 正在發送檢驗資料到 API: {post_url}'})
|
||
socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'})
|
||
socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'})
|
||
|
||
try:
|
||
# 發送 POST 請求,包含 API Token
|
||
response = requests.post(post_url, json=json_data, headers=headers, timeout=30)
|
||
|
||
socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'})
|
||
|
||
if response.status_code == 200:
|
||
socketio.emit('script_output', {'line': '✅ 檢驗資料成功發送到 API'})
|
||
try:
|
||
# 檢查回應內容是否為空
|
||
if response.text and response.text.strip():
|
||
response_data = response.json()
|
||
socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'})
|
||
except json.JSONDecodeError as e:
|
||
socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'})
|
||
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
|
||
elif response.status_code == 401:
|
||
socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
elif response.status_code == 500:
|
||
socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
else:
|
||
socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'})
|
||
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
|
||
|
||
except requests.exceptions.Timeout:
|
||
socketio.emit('script_output', {'line': '❌ API 請求超時'})
|
||
except requests.exceptions.ConnectionError:
|
||
socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'})
|
||
except Exception as e:
|
||
socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'})
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# ------------------ 手動測試功能 ------------------
|
||
@app.route('/manual_test')
|
||
def manual_test_page():
|
||
return render_template('manual_test.html')
|
||
|
||
@app.route('/api/manual_verify', methods=['POST'])
|
||
def api_manual_verify():
|
||
"""手動驗證 API"""
|
||
data = request.get_json()
|
||
uid = data.get('uid', '').strip()
|
||
ctr = data.get('ctr', '').strip()
|
||
cmac = data.get('cmac', '').strip()
|
||
key_index = data.get('key_index', '').strip()
|
||
url_index = data.get('url_index', '').strip()
|
||
full_url = data.get('full_url', '').strip()
|
||
quiet_mode = data.get('quiet_mode', False)
|
||
|
||
if not uid or not ctr or not cmac:
|
||
return jsonify({'success': False, 'message': 'UID、計數器和 CMAC 為必填欄位'}), 400
|
||
|
||
thread = threading.Thread(target=run_manual_verify, args=(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始手動驗證流程'})
|
||
|
||
def run_manual_verify(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode):
|
||
"""執行手動驗證"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'verify', '--manual', '--uid', uid, '--ctr', ctr, '--cmac', cmac]
|
||
|
||
if key_index:
|
||
cmd.extend(['--key', key_index])
|
||
if url_index:
|
||
cmd.extend(['--url-index', url_index])
|
||
if full_url:
|
||
cmd = [f'./{exe_file}', 'verify', '--manual', '--url', full_url]
|
||
if key_index:
|
||
cmd.extend(['--key', key_index])
|
||
if quiet_mode:
|
||
cmd.append('--quiet')
|
||
|
||
socketio.emit('script_output', {'line': f'執行手動驗證: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '手動驗證完成'})
|
||
|
||
@app.route('/api/manual_getuid', methods=['POST'])
|
||
def api_manual_getuid():
|
||
"""手動讀取 UID API"""
|
||
data = request.get_json()
|
||
key_index = data.get('key_index', '').strip()
|
||
quiet_mode = data.get('quiet_mode', False)
|
||
|
||
thread = threading.Thread(target=run_manual_getuid, args=(key_index, quiet_mode))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始讀取 UID 流程'})
|
||
|
||
def run_manual_getuid(key_index, quiet_mode):
|
||
"""執行手動讀取 UID"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'getuid']
|
||
if key_index:
|
||
cmd.extend(['--key', key_index])
|
||
if quiet_mode:
|
||
cmd.append('--quiet')
|
||
|
||
socketio.emit('script_output', {'line': f'執行讀取 UID: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '讀取 UID 完成'})
|
||
|
||
@app.route('/api/manual_setsdm', methods=['POST'])
|
||
def api_manual_setsdm():
|
||
"""手動設定 SDM API"""
|
||
data = request.get_json()
|
||
url_index = data.get('url_index', '').strip()
|
||
key_index = data.get('key_index', '').strip()
|
||
quiet_mode = data.get('quiet_mode', False)
|
||
|
||
if not url_index:
|
||
return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400
|
||
|
||
thread = threading.Thread(target=run_manual_setsdm, args=(url_index, key_index, quiet_mode))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始設定 SDM 流程'})
|
||
|
||
def run_manual_setsdm(url_index, key_index, quiet_mode):
|
||
"""執行手動設定 SDM"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'setsdm', '--url', url_index]
|
||
if key_index:
|
||
cmd.extend(['--key', key_index])
|
||
if quiet_mode:
|
||
cmd.append('--quiet')
|
||
|
||
socketio.emit('script_output', {'line': f'執行設定 SDM: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '設定 SDM 完成'})
|
||
|
||
@app.route('/api/manual_changekey', methods=['POST'])
|
||
def api_manual_changekey():
|
||
"""手動變更金鑰 API"""
|
||
data = request.get_json()
|
||
auth_key = data.get('auth_key', '').strip()
|
||
new_key = data.get('new_key', '').strip()
|
||
old_key = data.get('old_key', '').strip()
|
||
key_no = data.get('key_no', '0').strip()
|
||
quiet_mode = data.get('quiet_mode', False)
|
||
|
||
if not auth_key or not new_key or not old_key:
|
||
return jsonify({'success': False, 'message': '認證金鑰、新金鑰和舊金鑰為必填欄位'}), 400
|
||
|
||
thread = threading.Thread(target=run_manual_changekey, args=(auth_key, new_key, old_key, key_no, quiet_mode))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始變更金鑰流程'})
|
||
|
||
def run_manual_changekey(auth_key, new_key, old_key, key_no, quiet_mode):
|
||
"""執行手動變更金鑰"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'changekey', '--auth-key', auth_key, '--new-key', new_key, '--old-key', old_key, '--key-no', key_no]
|
||
if quiet_mode:
|
||
cmd.append('--quiet')
|
||
|
||
socketio.emit('script_output', {'line': f'執行變更金鑰: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '變更金鑰完成'})
|
||
|
||
@app.route('/api/manual_writendef', methods=['POST'])
|
||
def api_manual_writendef():
|
||
"""手動寫入 NDEF API"""
|
||
data = request.get_json()
|
||
url_index = data.get('url_index', '').strip()
|
||
|
||
if not url_index:
|
||
return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400
|
||
|
||
thread = threading.Thread(target=run_manual_writendef, args=(url_index,))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始寫入 NDEF 流程'})
|
||
|
||
def run_manual_writendef(url_index):
|
||
"""執行手動寫入 NDEF"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'writendef', '--url-index', url_index]
|
||
|
||
socketio.emit('script_output', {'line': f'執行寫入 NDEF: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '寫入 NDEF 完成'})
|
||
|
||
@app.route('/api/manual_readndef', methods=['POST'])
|
||
def api_manual_readndef():
|
||
"""手動讀取 NDEF API"""
|
||
thread = threading.Thread(target=run_manual_readndef)
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始讀取 NDEF 流程'})
|
||
|
||
def run_manual_readndef():
|
||
"""執行手動讀取 NDEF"""
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
cmd = [f'./{exe_file}', 'readndef']
|
||
|
||
socketio.emit('script_output', {'line': f'執行讀取 NDEF: {" ".join(cmd)}'})
|
||
output_lines, success = run_and_emit(cmd, exe_dir)
|
||
socketio.emit('script_finished', {'message': '讀取 NDEF 完成'})
|
||
|
||
@app.route('/api/manual_custom_command', methods=['POST'])
|
||
def api_manual_custom_command():
|
||
"""手動自定義指令 API"""
|
||
data = request.get_json()
|
||
command = data.get('command', '').strip()
|
||
description = data.get('description', '').strip()
|
||
|
||
if not command:
|
||
return jsonify({'success': False, 'message': '指令不得為空'}), 400
|
||
|
||
# 基本安全檢查:確保指令以 ./nt4h_c_example 開頭
|
||
if not command.startswith('./nt4h_c_example'):
|
||
return jsonify({'success': False, 'message': '指令必須以 ./nt4h_c_example 開頭'}), 400
|
||
|
||
thread = threading.Thread(target=run_manual_custom_command, args=(command, description))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始執行自定義指令'})
|
||
|
||
def run_manual_custom_command(command, description=''):
|
||
"""執行手動自定義指令"""
|
||
# 使用絕對路徑
|
||
exe_dir = abspath('linux64_release')
|
||
|
||
# 解析指令
|
||
cmd_parts = command.split()
|
||
|
||
# 移除開頭的 ./nt4h_c_example,使用完整路徑
|
||
if cmd_parts[0] == './nt4h_c_example':
|
||
cmd_parts[0] = abspath('linux64_release/nt4h_c_example')
|
||
else:
|
||
# 如果不是以 ./nt4h_c_example 開頭,直接使用完整路徑
|
||
cmd_parts[0] = abspath('linux64_release/nt4h_c_example')
|
||
|
||
socketio.emit('script_output', {'line': f'執行自定義指令: {" ".join(cmd_parts)}'})
|
||
if description:
|
||
socketio.emit('script_output', {'line': f'指令說明: {description}'})
|
||
|
||
output_lines, success = run_and_emit(cmd_parts, exe_dir)
|
||
socketio.emit('script_finished', {'message': '自定義指令執行完成'})
|
||
|
||
@app.route('/api/check_nfc', methods=['POST'])
|
||
def api_check_nfc():
|
||
"""檢驗 NFC API"""
|
||
data = request.get_json()
|
||
key = data.get('key')
|
||
name = data.get('name', '')
|
||
if not key:
|
||
return jsonify({'success': False, 'message': '未選擇 key'}), 400
|
||
|
||
# 儲存名稱到 config
|
||
if name:
|
||
config['name'] = name
|
||
save_config(config)
|
||
|
||
thread = threading.Thread(target=run_check_nfc, args=(key, name))
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始檢驗 NFC 流程'})
|
||
|
||
def run_check_nfc(key_no, name=''):
|
||
"""執行檢驗 NFC"""
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
socketio.emit('script_output', {'line': '找不到 keys.txt,無法檢驗NFC'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
|
||
# 執行本地 CMAC 認證
|
||
socketio.emit('script_output', {'line': '🔍 開始檢驗 NFC...'})
|
||
cmd = [f'./{exe_file}', 'verify', '--key', key_no]
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False)
|
||
|
||
# 檢查本地認證結果
|
||
if success:
|
||
socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'})
|
||
|
||
# 解析結果
|
||
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys)
|
||
|
||
if uid:
|
||
socketio.emit('script_output', {'line': f'UID: {uid}'})
|
||
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
|
||
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
|
||
|
||
# POST 到資料 API
|
||
post_to_verify_api(uid, name)
|
||
else:
|
||
socketio.emit('script_output', {'line': '❌ 無法解析 UID'})
|
||
else:
|
||
# 檢查是否有 MAC 驗證失敗的錯誤碼
|
||
mac_failed = False
|
||
for line in output_lines:
|
||
if 'MAC 驗證失敗' in line and '0x' in line:
|
||
socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'})
|
||
mac_failed = True
|
||
break
|
||
|
||
if not mac_failed:
|
||
socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'})
|
||
|
||
socketio.emit('script_finished', {'message': '檢驗 NFC 流程結束'})
|
||
|
||
@app.route('/api/test_nfc', methods=['POST'])
|
||
def api_test_nfc():
|
||
"""完整測試 NFC API"""
|
||
thread = threading.Thread(target=run_test_nfc)
|
||
thread.daemon = True
|
||
thread.start()
|
||
return jsonify({'success': True, 'message': '開始完整測試 NFC 流程'})
|
||
|
||
def run_test_nfc():
|
||
"""執行完整測試 NFC"""
|
||
keys_path = get_keys_path()
|
||
if not os.path.exists(keys_path):
|
||
socketio.emit('script_output', {'line': '找不到 keys.txt,無法執行完整測試'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
with open(keys_path, 'r', encoding='utf-8') as f:
|
||
keys = [line.strip() for line in f if line.strip()]
|
||
exe_dir = dirname(abspath(nt4h_exe_path))
|
||
exe_file = basename(nt4h_exe_path)
|
||
test_script_path = os.path.join(exe_dir, 'test_ntag424.sh')
|
||
|
||
socketio.emit('script_output', {'line': '🔍 開始完整測試 NFC 功能...'})
|
||
|
||
# 步驟1: 讀取UID找到可用的KEY
|
||
socketio.emit('script_output', {'line': '🔄 步驟1: 讀取UID尋找可用KEY...'})
|
||
oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir)
|
||
if not oldkey:
|
||
socketio.emit('script_output', {'line': '❌ 未找到可用的 oldkey,無法執行完整測試'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
|
||
socketio.emit('script_output', {'line': f'✅ 找到可用 oldkey:第{oldkey}行'})
|
||
|
||
# 步驟2: 強制更換到key 1
|
||
socketio.emit('script_output', {'line': '🔄 步驟2: 強制更換到key 1...'})
|
||
cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', '1', '--old-key', oldkey, '--key-no', '0']
|
||
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False)
|
||
|
||
if not success:
|
||
socketio.emit('script_output', {'line': '❌ 強制更換到key 1失敗,無法執行完整測試'})
|
||
socketio.emit('script_finished', {'message': '流程結束'})
|
||
return
|
||
|
||
socketio.emit('script_output', {'line': '✅ 成功強制更換到key 1!'})
|
||
|
||
# 步驟3: 執行測試腳本
|
||
if os.path.exists(test_script_path):
|
||
socketio.emit('script_output', {'line': '🔄 步驟3: 執行完整測試腳本...'})
|
||
socketio.emit('script_output', {'line': f'測試腳本路徑: {test_script_path}'})
|
||
|
||
# 執行測試腳本
|
||
test_cmd = ['bash', test_script_path]
|
||
test_output_lines, test_success = run_and_emit(test_cmd, exe_dir, success_keywords=['🎉 所有測試都通過了!'])
|
||
|
||
if test_success:
|
||
socketio.emit('script_output', {'line': '✅ NFC 完整測試完成!所有測試都通過了!'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '⚠️ NFC 完整測試完成,但可能有問題'})
|
||
else:
|
||
socketio.emit('script_output', {'line': '❌ 找不到 test_ntag424.sh 測試腳本'})
|
||
socketio.emit('script_output', {'line': f'預期路徑: {test_script_path}'})
|
||
|
||
socketio.emit('script_finished', {'message': 'NFC 完整測試完成'})
|
||
|
||
@socketio.on('connect')
|
||
def ws_connect():
|
||
print(f"WebSocket 客戶端已連接: {request.sid}")
|
||
emit('connected', {'message': '已連接到伺服器'})
|
||
|
||
@socketio.on('disconnect')
|
||
def ws_disconnect():
|
||
print(f"WebSocket 客戶端已斷開: {request.sid}")
|
||
|
||
@socketio.on('error')
|
||
def ws_error(error):
|
||
print(f"WebSocket 錯誤: {error}")
|
||
|
||
if __name__ == '__main__':
|
||
print("NFC 生產工具與腳本執行監控整合版")
|
||
print("=" * 50)
|
||
#print(f"預設腳本路徑: {script_path}")
|
||
print("啟動網頁伺服器...")
|
||
print("請在瀏覽器中開啟: http://localhost:5000")
|
||
print("按 Ctrl+C 停止伺服器")
|
||
print("=" * 50)
|
||
socketio.run(app, host='0.0.0.0', port=5000, debug=False) |