NFCcreate-web/app.py
2025-09-25 19:04:00 +08:00

1193 lines
48 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
整合 NFC 生產工具與腳本執行監控
Flask + Flask-SocketIO
"""
# 必須在導入其他模組之前執行 eventlet.monkey_patch()
import eventlet
eventlet.monkey_patch()
import os
import subprocess
import threading
import queue
import re
from datetime import datetime
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
from os.path import dirname, basename, abspath
import json
import shutil
import urllib.parse
import requests
app = Flask(__name__)
app.config['SECRET_KEY'] = 'nfc_web_controller_secret_key'
socketio = SocketIO(app, cors_allowed_origins="*")
exe_dir = 'linux64_release'
CONFIG_FILE = 'config.json'
def load_config():
"""載入設定檔"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {'api_url': ''}
def save_config(config):
"""儲存設定檔"""
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 載入設定
config = load_config()
def get_keys_path():
return os.path.join(exe_dir, 'keys.txt')
def get_urls_path():
return os.path.join(exe_dir, 'urls.txt')
def get_default_exe_path():
return os.path.join(exe_dir, 'nt4h_c_example')
nt4h_exe_path = get_default_exe_path()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/set_exe_dir', methods=['POST'])
def api_set_exe_dir():
global exe_dir, nt4h_exe_path
data = request.get_json()
new_dir = data.get('exe_dir')
if not new_dir or not os.path.isdir(new_dir):
return jsonify({'success': False, 'message': '資料夾不存在'}), 400
exe_dir = new_dir
# 跟著自動切換執行檔路徑
nt4h_exe_path = get_default_exe_path()
return jsonify({'success': True, 'exe_dir': exe_dir, 'exe_path': nt4h_exe_path})
@app.route('/api/keys')
def api_keys():
keys_path = get_keys_path()
if not os.path.exists(keys_path):
return jsonify({'success': False, 'message': '找不到 keys.txt'}), 404
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
return jsonify({'success': True, 'keys': keys})
@app.route('/api/urls')
def api_urls():
urls_path = get_urls_path()
if not os.path.exists(urls_path):
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
with open(urls_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
return jsonify({'success': True, 'urls': urls})
@app.route('/api/get_url_by_index', methods=['POST'])
def api_get_url_by_index():
"""根據索引獲取特定 URL"""
data = request.get_json()
index = data.get('index')
if not index or not index.isdigit():
return jsonify({'success': False, 'message': '無效的索引'}), 400
urls_path = get_urls_path()
if not os.path.exists(urls_path):
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
with open(urls_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
idx = int(index) - 1 # 轉換為 0-based 索引
if idx < 0 or idx >= len(urls):
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
return jsonify({'success': True, 'url': urls[idx], 'index': index})
@app.route('/api/update_url_by_index', methods=['POST'])
def api_update_url_by_index():
"""更新特定索引的 URL"""
data = request.get_json()
index = data.get('index')
new_url = data.get('url', '').strip()
if not index or not index.isdigit():
return jsonify({'success': False, 'message': '無效的索引'}), 400
if not new_url:
return jsonify({'success': False, 'message': 'URL 不得為空'}), 400
urls_path = get_urls_path()
if not os.path.exists(urls_path):
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
# 讀取所有 URL
with open(urls_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
idx = int(index) - 1 # 轉換為 0-based 索引
if idx < 0 or idx >= len(urls):
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
# 更新 URL
urls[idx] = new_url
# 寫回檔案
try:
with open(urls_path, 'w', encoding='utf-8') as f:
for url in urls:
f.write(url + '\n')
return jsonify({'success': True, 'message': 'URL 更新成功'})
except Exception as e:
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
@app.route('/api/add_url', methods=['POST'])
def api_add_url():
"""添加新的 URL"""
data = request.get_json()
new_url = data.get('url', '').strip()
if not new_url:
return jsonify({'success': False, 'message': 'URL 不得為空'}), 400
urls_path = get_urls_path()
if not os.path.exists(urls_path):
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
# 讀取現有 URL
with open(urls_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
# 添加新 URL
urls.append(new_url)
# 寫回檔案
try:
with open(urls_path, 'w', encoding='utf-8') as f:
for url in urls:
f.write(url + '\n')
return jsonify({'success': True, 'message': 'URL 添加成功', 'new_index': len(urls)})
except Exception as e:
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
@app.route('/api/delete_url_by_index', methods=['POST'])
def api_delete_url_by_index():
"""刪除特定索引的 URL"""
data = request.get_json()
index = data.get('index')
if not index or not index.isdigit():
return jsonify({'success': False, 'message': '無效的索引'}), 400
urls_path = get_urls_path()
if not os.path.exists(urls_path):
return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404
# 讀取所有 URL
with open(urls_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
idx = int(index) - 1 # 轉換為 0-based 索引
if idx < 0 or idx >= len(urls):
return jsonify({'success': False, 'message': '索引超出範圍'}), 400
# 刪除 URL
deleted_url = urls.pop(idx)
# 寫回檔案
try:
with open(urls_path, 'w', encoding='utf-8') as f:
for url in urls:
f.write(url + '\n')
return jsonify({'success': True, 'message': 'URL 刪除成功', 'deleted_url': deleted_url})
except Exception as e:
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
@app.route('/api/save_urls_file', methods=['POST'])
def api_save_urls_file():
"""保存整個 URLs 文件內容"""
data = request.get_json()
urls_content = data.get('urls_content', '')
urls_path = get_urls_path()
try:
# 將內容按行分割,過濾空行
urls = [line.strip() for line in urls_content.split('\n') if line.strip()]
# 寫入檔案
with open(urls_path, 'w', encoding='utf-8') as f:
for url in urls:
f.write(url + '\n')
return jsonify({'success': True, 'message': 'URLs 文件保存成功', 'urls_count': len(urls)})
except Exception as e:
return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500
@app.route('/api/set_exe_path', methods=['POST'])
def api_set_exe_path():
global nt4h_exe_path
data = request.get_json()
exe_path = data.get('exe_path')
if not exe_path or not os.path.isfile(exe_path):
return jsonify({'success': False, 'message': '檔案不存在'}), 400
nt4h_exe_path = exe_path
return jsonify({'success': True, 'exe_path': nt4h_exe_path})
@app.route('/api/set_api_url', methods=['POST'])
def api_set_api_url():
global config
data = request.get_json()
api_url = data.get('api_url')
if not api_url:
return jsonify({'success': False, 'message': '未提供 API URL'}), 400
config['api_url'] = api_url
save_config(config)
return jsonify({'success': True, 'api_url': api_url})
@app.route('/api/set_api_token', methods=['POST'])
def api_set_api_token():
global config
data = request.get_json()
api_token = data.get('api_token')
if not api_token:
return jsonify({'success': False, 'message': '未提供 API Token'}), 400
config['api_token'] = api_token
save_config(config)
return jsonify({'success': True, 'message': 'API Token 設定成功'})
@app.route('/api/get_api_token')
def api_get_api_token():
return jsonify({'api_token': config.get('api_token', '')})
@app.route('/api/get_api_url')
def api_get_api_url():
return jsonify({'api_url': config.get('api_url', '')})
@app.route('/api/get_name')
def api_get_name():
return jsonify({'name': config.get('name', '')})
@app.route('/api/set_name', methods=['POST'])
def api_set_name():
data = request.get_json()
name = data.get('name', '')
config['name'] = name
save_config(config)
return jsonify({'success': True, 'name': name})
@app.route('/api/get_names', methods=['GET'])
def api_get_names():
names = config.get('names', [])
current = config.get('name', '')
return jsonify({'names': names, 'current': current})
@app.route('/api/add_name', methods=['POST'])
def api_add_name():
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'success': False, 'message': '名稱不得為空'}), 400
names = config.get('names', [])
if name not in names:
names.append(name)
config['names'] = names
# 不自動設定為當前選擇,保持空白
save_config(config)
return jsonify({'success': True, 'names': names, 'current': ''})
@app.route('/api/delete_name', methods=['POST'])
def api_delete_name():
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'success': False, 'message': '名稱不得為空'}), 400
names = config.get('names', [])
if name in names:
names.remove(name)
config['names'] = names
# 如果刪除的是當前選中的名稱,清空當前選擇
if config.get('name') == name:
config['name'] = ''
save_config(config)
return jsonify({'success': True, 'names': names, 'current': config.get('name', '')})
else:
return jsonify({'success': False, 'message': '名稱不存在'}), 400
def run_and_emit(cmd, exe_dir, success_keywords=None, stop_on_success=False):
"""
執行指令並即時推送結果到前端。
若 success_keywords 有設定,遇到任一關鍵字就回傳 True可用於 early stop
"""
socketio.emit('script_output', {'line': f'$ {" ".join(cmd)}'})
try:
import os
import select
import sys
# 設定環境變數確保即時輸出
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
env['PYTHONIOENCODING'] = 'utf-8'
# 使用 pty 來獲得更真實的終端體驗
try:
import pty
master, slave = pty.openpty()
proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=slave, stderr=slave,
stdin=slave, env=env, preexec_fn=os.setsid)
os.close(slave)
output_lines = []
success = False
# 使用 select 來非阻塞讀取
while True:
try:
# 檢查進程是否還在運行
if proc.poll() is not None:
break
# 使用 select 等待數據
ready, _, _ = select.select([master], [], [], 0.1)
if ready:
data = os.read(master, 1024).decode('utf-8', errors='ignore')
if data:
# 按行分割並發送
lines = data.split('\n')
for i, line in enumerate(lines):
if i < len(lines) - 1: # 完整的行
clean_line = line.rstrip('\r')
if clean_line: # 只發送非空行
socketio.emit('script_output', {'line': clean_line})
output_lines.append(clean_line)
# 檢查成功關鍵字
if success_keywords and any(kw in clean_line for kw in success_keywords):
success = True
if stop_on_success:
proc.terminate()
break
else: # 最後一行可能不完整
if line.strip():
# 保存到緩衝區,等待更多數據
pass
if success and stop_on_success:
break
except OSError:
break
# 讀取剩餘的輸出
try:
remaining = os.read(master, 4096).decode('utf-8', errors='ignore')
if remaining:
for line in remaining.split('\n'):
clean_line = line.rstrip('\r')
if clean_line:
socketio.emit('script_output', {'line': clean_line})
output_lines.append(clean_line)
except OSError:
pass
os.close(master)
proc.wait()
except ImportError:
# 如果 pty 不可用,回退到標準方法
proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1, env=env)
output_lines = []
success = False
for line in iter(proc.stdout.readline, ''):
if line:
clean_line = line.rstrip('\r\n')
socketio.emit('script_output', {'line': clean_line})
output_lines.append(clean_line)
if success_keywords and any(kw in clean_line for kw in success_keywords):
success = True
if stop_on_success:
break
proc.wait()
return output_lines, success
except Exception as e:
socketio.emit('script_output', {'line': f'執行指令時發生錯誤: {e}'})
return [], False
def find_valid_oldkey(keys, exe_file, exe_dir):
"""
依序 getuid --key <行號>,遇到成功就停下,回傳 (oldkey行號, output_lines)
"""
for idx in range(len(keys)):
key_no = str(idx+1)
cmd = [f'./{exe_file}', 'getuid', '--key', key_no]
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False)
if success:
socketio.emit('script_output', {'line': f'找到可用 oldkey{key_no}'})
return key_no, output_lines
return None, []
@app.route('/api/run', methods=['POST'])
def api_run():
data = request.get_json()
key = data.get('key')
url = data.get('url')
name = data.get('name', '')
if not key:
return jsonify({'success': False, 'message': '未選擇 key'}), 400
# 儲存名稱到 config
if name:
config['name'] = name
save_config(config)
thread = threading.Thread(target=run_nfc_produce, args=(key, url, name))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始執行生產新SDM流程'})
def run_nfc_produce(newkey, url, name=''):
keys_path = get_keys_path()
if not os.path.exists(keys_path):
socketio.emit('script_output', {'line': '找不到 keys.txt無法生產NFC'})
socketio.emit('script_finished', {'message': '流程結束'})
return
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir)
if not oldkey:
socketio.emit('script_output', {'line': '未找到可用的 oldkey流程結束'})
socketio.emit('script_finished', {'message': '流程結束'})
return
# 執行 changekey
socketio.emit('script_output', {'line': '🔄 開始變更金鑰...'})
cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', newkey, '--old-key', oldkey, '--key-no', '0']
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False)
if not success:
socketio.emit('script_output', {'line': '❌ 變更金鑰失敗,流程結束'})
socketio.emit('script_finished', {'message': '流程結束'})
return
socketio.emit('script_output', {'line': '✅ 成功變更金鑰!'})
# 執行 setsdm
socketio.emit('script_output', {'line': '🔄 開始設定 SDM...'})
cmd = [f'./{exe_file}', 'setsdm', '--url', url, '--key', newkey]
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['快速 SDM 設定完成', 'SDM 設定完成'], stop_on_success=False)
if not success:
socketio.emit('script_output', {'line': '❌ 設定 SDM 失敗,流程結束'})
socketio.emit('script_finished', {'message': '流程結束'})
return
socketio.emit('script_output', {'line': '✅ 成功設定 SDM'})
# 執行 verify
socketio.emit('script_output', {'line': '🔄 開始 CMAC 認證...'})
cmd = [f'./{exe_file}', 'verify', '--key', newkey]
output_lines, success = run_and_emit(cmd, exe_dir)
# 解析 verify 結果
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, newkey, keys)
if uid and sdm_ctr and cmac:
socketio.emit('script_output', {'line': f'✅ CMAC 認證成功!'})
socketio.emit('script_output', {'line': f'UID: {uid}'})
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
# POST 到 API
post_to_api(uid, sdm_ctr, cmac, newkey, name)
else:
socketio.emit('script_output', {'line': '❌ CMAC 認證失敗,無法解析結果'})
socketio.emit('script_finished', {'message': '生產新SDM流程結束'})
@app.route('/api/read_nfc', methods=['POST'])
def api_read_nfc():
thread = threading.Thread(target=read_nfc)
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始批次讀取所有NFC'})
def read_nfc():
keys_path = get_keys_path()
if not os.path.exists(keys_path):
socketio.emit('script_output', {'line': '找不到 keys.txt無法批次讀取NFC'})
socketio.emit('script_finished', {'message': '讀取結束'})
return
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
exe_dir_path = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
for idx in range(len(keys)):
key_no = str(idx+1)
cmd = [f'./{exe_file}', 'getuid', '--key', key_no]
output_lines, success = run_and_emit(cmd, exe_dir_path, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False)
if success:
socketio.emit('script_output', {'line': f'成功讀取到 UID停止讀取'})
break
socketio.emit('script_finished', {'message': '讀取結束'})
@app.route('/api/verify', methods=['POST'])
def api_verify():
data = request.get_json()
key = data.get('key')
name = data.get('name', '')
if not key:
return jsonify({'success': False, 'message': '未選擇 key'}), 400
# 儲存名稱到 config
if name:
config['name'] = name
save_config(config)
thread = threading.Thread(target=verify_nfc, args=(key, name))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始 CMAC 認證流程'})
def parse_verify_output(output_lines, key_no, keys):
uid = None
sdm_ctr = None
cmac = None
for line in output_lines:
if 'UID:' in line:
uid = line.split('UID:')[1].strip()
elif 'SDM 讀取計數器:' in line:
sdm_ctr = line.split('SDM 讀取計數器:')[1].strip()
elif 'ASCII MAC 資料:' in line:
cmac = line.split('ASCII MAC 資料:')[1].strip()
key_value = keys[int(key_no)-1] if key_no and key_no.isdigit() and 0 < int(key_no) <= len(keys) else ''
return uid, sdm_ctr, cmac, key_value
def post_to_api(uid, sdm_ctr, cmac, key_index, name=''):
"""將 UID、SDM 計數器、CMAC 資料 POST 到指定的 API URL"""
api_url = config.get('api_url', '')
if not api_url:
socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL跳過POST請求'})
return
# 檢查是否有 API Token
api_token = config.get('api_token', '')
if not api_token:
socketio.emit('script_output', {'line': '⚠️ 未設定 API Token跳過POST請求'})
return
# 構建 POST 請求的 URL
post_url = f"{api_url}/nfc/insertForProduction"
# 準備 JSON 資料
json_data = {
'uid': uid,
'ctr': sdm_ctr,
'key': key_index
}
# 準備請求標頭,包含 API Token
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
socketio.emit('script_output', {'line': f'🔄 正在發送資料到 API: {post_url}'})
socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'})
socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'})
try:
# 發送 POST 請求,包含 API Token
response = requests.post(post_url, json=json_data, headers=headers, timeout=30)
socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'})
if response.status_code == 200:
socketio.emit('script_output', {'line': '✅ 資料成功發送到 API'})
try:
# 檢查回應內容是否為空
if response.text and response.text.strip():
response_data = response.json()
socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'})
else:
socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'})
except json.JSONDecodeError as e:
socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'})
except Exception as e:
socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'})
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
elif response.status_code == 401:
socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
elif response.status_code == 500:
socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
else:
socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
except requests.exceptions.Timeout:
socketio.emit('script_output', {'line': '❌ API 請求超時'})
except requests.exceptions.ConnectionError:
socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'})
except Exception as e:
socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'})
def verify_nfc(key_no, name=''):
keys_path = get_keys_path()
if not os.path.exists(keys_path):
socketio.emit('script_output', {'line': '找不到 keys.txt無法認證'})
socketio.emit('script_finished', {'message': '流程結束'})
return
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
# 執行 CMAC 認證
cmd = [f'./{exe_file}', 'verify', '--key', key_no]
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False)
# 檢查本地認證結果
if success:
socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'})
# 解析結果
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys)
if uid and sdm_ctr and cmac:
socketio.emit('script_output', {'line': f'UID: {uid}'})
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
# 進行遠端認證
verify_remote(uid, sdm_ctr, cmac, key_no)
else:
socketio.emit('script_output', {'line': '❌ 無法解析認證結果'})
else:
# 檢查是否有 MAC 驗證失敗的錯誤碼
mac_failed = False
for line in output_lines:
if 'MAC 驗證失敗' in line and '0x' in line:
socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'})
mac_failed = True
break
if not mac_failed:
socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'})
socketio.emit('script_finished', {'message': 'CMAC 認證流程結束'})
def verify_remote(uid, sdm_ctr, cmac, key_no):
"""進行遠端 CMAC 認證"""
verify_url = f"https://verify.contree.app/verify?uid={uid}&ctr={sdm_ctr}&cmac={cmac}&key={key_no}"
socketio.emit('script_output', {'line': '🔄 開始遠端認證...'})
socketio.emit('script_output', {'line': f'認證 URL: {verify_url}'})
try:
# 發送 GET 請求到遠端認證服務
response = requests.get(verify_url, timeout=30)
socketio.emit('script_output', {'line': f'📡 遠端認證回應狀態碼: {response.status_code}'})
if response.status_code == 200:
try:
# 檢查回應內容是否為空
if response.text and response.text.strip():
response_data = response.json()
if response_data.get('success') == True:
socketio.emit('script_output', {'line': '✅ 線上認證成功!'})
socketio.emit('script_output', {'line': f'📄 認證訊息: {response_data.get("message", "")}'})
socketio.emit('script_output', {'line': f'📄 認證時間: {response_data.get("timestamp", "")}'})
# 顯示詳細資訊
details = response_data.get('details', {})
if details:
socketio.emit('script_output', {'line': f'📄 認證詳情:'})
socketio.emit('script_output', {'line': f' UID: {details.get("uid", "")}'})
socketio.emit('script_output', {'line': f' 計數器: {details.get("counter", "")}'})
socketio.emit('script_output', {'line': f' CMAC: {details.get("cmac", "")}'})
socketio.emit('script_output', {'line': f' 金鑰索引: {details.get("key_index", "")}'})
else:
socketio.emit('script_output', {'line': '❌ 線上認證失敗'})
socketio.emit('script_output', {'line': f'📄 錯誤訊息: {response_data.get("message", "")}'})
else:
socketio.emit('script_output', {'line': '📄 遠端認證回應: (空回應)'})
except json.JSONDecodeError as e:
socketio.emit('script_output', {'line': f'📄 遠端認證回應 (非JSON格式): {response.text}'})
except Exception as e:
socketio.emit('script_output', {'line': f'📄 遠端認證回應解析錯誤: {str(e)}'})
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
else:
socketio.emit('script_output', {'line': f'❌ 遠端認證請求失敗,狀態碼: {response.status_code}'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
except requests.exceptions.Timeout:
socketio.emit('script_output', {'line': '❌ 遠端認證請求超時'})
except requests.exceptions.ConnectionError:
socketio.emit('script_output', {'line': '❌ 無法連接到遠端認證服務'})
except Exception as e:
socketio.emit('script_output', {'line': f'❌ 遠端認證發生錯誤: {str(e)}'})
def post_to_verify_api(uid, name=''):
"""將 UID POST 到資料 API 進行檢驗"""
api_url = config.get('api_url', '')
if not api_url:
socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL跳過POST請求'})
return
# 檢查是否有 API Token
api_token = config.get('api_token', '')
if not api_token:
socketio.emit('script_output', {'line': '⚠️ 未設定 API Token跳過POST請求'})
return
# 構建 POST 請求的 URL
post_url = f"{api_url}/nfc/verifyForProduction"
# 準備 JSON 資料
json_data = {
'uid': uid
}
# 準備請求標頭,包含 API Token
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
socketio.emit('script_output', {'line': f'🔄 正在發送檢驗資料到 API: {post_url}'})
socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'})
socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'})
try:
# 發送 POST 請求,包含 API Token
response = requests.post(post_url, json=json_data, headers=headers, timeout=30)
socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'})
if response.status_code == 200:
socketio.emit('script_output', {'line': '✅ 檢驗資料成功發送到 API'})
try:
# 檢查回應內容是否為空
if response.text and response.text.strip():
response_data = response.json()
socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'})
else:
socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'})
except json.JSONDecodeError as e:
socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'})
except Exception as e:
socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'})
socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'})
elif response.status_code == 401:
socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
elif response.status_code == 500:
socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
else:
socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'})
socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'})
except requests.exceptions.Timeout:
socketio.emit('script_output', {'line': '❌ API 請求超時'})
except requests.exceptions.ConnectionError:
socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'})
except Exception as e:
socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'})
# ------------------ 手動測試功能 ------------------
@app.route('/manual_test')
def manual_test_page():
return render_template('manual_test.html')
@app.route('/api/manual_verify', methods=['POST'])
def api_manual_verify():
"""手動驗證 API"""
data = request.get_json()
uid = data.get('uid', '').strip()
ctr = data.get('ctr', '').strip()
cmac = data.get('cmac', '').strip()
key_index = data.get('key_index', '').strip()
url_index = data.get('url_index', '').strip()
full_url = data.get('full_url', '').strip()
quiet_mode = data.get('quiet_mode', False)
if not uid or not ctr or not cmac:
return jsonify({'success': False, 'message': 'UID、計數器和 CMAC 為必填欄位'}), 400
thread = threading.Thread(target=run_manual_verify, args=(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始手動驗證流程'})
def run_manual_verify(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode):
"""執行手動驗證"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'verify', '--manual', '--uid', uid, '--ctr', ctr, '--cmac', cmac]
if key_index:
cmd.extend(['--key', key_index])
if url_index:
cmd.extend(['--url-index', url_index])
if full_url:
cmd = [f'./{exe_file}', 'verify', '--manual', '--url', full_url]
if key_index:
cmd.extend(['--key', key_index])
if quiet_mode:
cmd.append('--quiet')
socketio.emit('script_output', {'line': f'執行手動驗證: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '手動驗證完成'})
@app.route('/api/manual_getuid', methods=['POST'])
def api_manual_getuid():
"""手動讀取 UID API"""
data = request.get_json()
key_index = data.get('key_index', '').strip()
quiet_mode = data.get('quiet_mode', False)
thread = threading.Thread(target=run_manual_getuid, args=(key_index, quiet_mode))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始讀取 UID 流程'})
def run_manual_getuid(key_index, quiet_mode):
"""執行手動讀取 UID"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'getuid']
if key_index:
cmd.extend(['--key', key_index])
if quiet_mode:
cmd.append('--quiet')
socketio.emit('script_output', {'line': f'執行讀取 UID: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '讀取 UID 完成'})
@app.route('/api/manual_setsdm', methods=['POST'])
def api_manual_setsdm():
"""手動設定 SDM API"""
data = request.get_json()
url_index = data.get('url_index', '').strip()
key_index = data.get('key_index', '').strip()
quiet_mode = data.get('quiet_mode', False)
if not url_index:
return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400
thread = threading.Thread(target=run_manual_setsdm, args=(url_index, key_index, quiet_mode))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始設定 SDM 流程'})
def run_manual_setsdm(url_index, key_index, quiet_mode):
"""執行手動設定 SDM"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'setsdm', '--url', url_index]
if key_index:
cmd.extend(['--key', key_index])
if quiet_mode:
cmd.append('--quiet')
socketio.emit('script_output', {'line': f'執行設定 SDM: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '設定 SDM 完成'})
@app.route('/api/manual_changekey', methods=['POST'])
def api_manual_changekey():
"""手動變更金鑰 API"""
data = request.get_json()
auth_key = data.get('auth_key', '').strip()
new_key = data.get('new_key', '').strip()
old_key = data.get('old_key', '').strip()
key_no = data.get('key_no', '0').strip()
quiet_mode = data.get('quiet_mode', False)
if not auth_key or not new_key or not old_key:
return jsonify({'success': False, 'message': '認證金鑰、新金鑰和舊金鑰為必填欄位'}), 400
thread = threading.Thread(target=run_manual_changekey, args=(auth_key, new_key, old_key, key_no, quiet_mode))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始變更金鑰流程'})
def run_manual_changekey(auth_key, new_key, old_key, key_no, quiet_mode):
"""執行手動變更金鑰"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'changekey', '--auth-key', auth_key, '--new-key', new_key, '--old-key', old_key, '--key-no', key_no]
if quiet_mode:
cmd.append('--quiet')
socketio.emit('script_output', {'line': f'執行變更金鑰: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '變更金鑰完成'})
@app.route('/api/manual_writendef', methods=['POST'])
def api_manual_writendef():
"""手動寫入 NDEF API"""
data = request.get_json()
url_index = data.get('url_index', '').strip()
if not url_index:
return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400
thread = threading.Thread(target=run_manual_writendef, args=(url_index,))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始寫入 NDEF 流程'})
def run_manual_writendef(url_index):
"""執行手動寫入 NDEF"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'writendef', '--url-index', url_index]
socketio.emit('script_output', {'line': f'執行寫入 NDEF: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '寫入 NDEF 完成'})
@app.route('/api/manual_readndef', methods=['POST'])
def api_manual_readndef():
"""手動讀取 NDEF API"""
thread = threading.Thread(target=run_manual_readndef)
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始讀取 NDEF 流程'})
def run_manual_readndef():
"""執行手動讀取 NDEF"""
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
cmd = [f'./{exe_file}', 'readndef']
socketio.emit('script_output', {'line': f'執行讀取 NDEF: {" ".join(cmd)}'})
output_lines, success = run_and_emit(cmd, exe_dir)
socketio.emit('script_finished', {'message': '讀取 NDEF 完成'})
@app.route('/api/manual_custom_command', methods=['POST'])
def api_manual_custom_command():
"""手動自定義指令 API"""
data = request.get_json()
command = data.get('command', '').strip()
description = data.get('description', '').strip()
if not command:
return jsonify({'success': False, 'message': '指令不得為空'}), 400
# 基本安全檢查:確保指令以 ./nt4h_c_example 開頭
if not command.startswith('./nt4h_c_example'):
return jsonify({'success': False, 'message': '指令必須以 ./nt4h_c_example 開頭'}), 400
thread = threading.Thread(target=run_manual_custom_command, args=(command, description))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始執行自定義指令'})
def run_manual_custom_command(command, description=''):
"""執行手動自定義指令"""
# 使用絕對路徑
exe_dir = abspath('linux64_release')
# 解析指令
cmd_parts = command.split()
# 移除開頭的 ./nt4h_c_example使用完整路徑
if cmd_parts[0] == './nt4h_c_example':
cmd_parts[0] = abspath('linux64_release/nt4h_c_example')
else:
# 如果不是以 ./nt4h_c_example 開頭,直接使用完整路徑
cmd_parts[0] = abspath('linux64_release/nt4h_c_example')
socketio.emit('script_output', {'line': f'執行自定義指令: {" ".join(cmd_parts)}'})
if description:
socketio.emit('script_output', {'line': f'指令說明: {description}'})
output_lines, success = run_and_emit(cmd_parts, exe_dir)
socketio.emit('script_finished', {'message': '自定義指令執行完成'})
@app.route('/api/check_nfc', methods=['POST'])
def api_check_nfc():
"""檢驗 NFC API"""
data = request.get_json()
key = data.get('key')
name = data.get('name', '')
if not key:
return jsonify({'success': False, 'message': '未選擇 key'}), 400
# 儲存名稱到 config
if name:
config['name'] = name
save_config(config)
thread = threading.Thread(target=run_check_nfc, args=(key, name))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始檢驗 NFC 流程'})
def run_check_nfc(key_no, name=''):
"""執行檢驗 NFC"""
keys_path = get_keys_path()
if not os.path.exists(keys_path):
socketio.emit('script_output', {'line': '找不到 keys.txt無法檢驗NFC'})
socketio.emit('script_finished', {'message': '流程結束'})
return
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
# 執行本地 CMAC 認證
socketio.emit('script_output', {'line': '🔍 開始檢驗 NFC...'})
cmd = [f'./{exe_file}', 'verify', '--key', key_no]
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False)
# 檢查本地認證結果
if success:
socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'})
# 解析結果
uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys)
if uid:
socketio.emit('script_output', {'line': f'UID: {uid}'})
socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'})
socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'})
# POST 到資料 API
post_to_verify_api(uid, name)
else:
socketio.emit('script_output', {'line': '❌ 無法解析 UID'})
else:
# 檢查是否有 MAC 驗證失敗的錯誤碼
mac_failed = False
for line in output_lines:
if 'MAC 驗證失敗' in line and '0x' in line:
socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'})
mac_failed = True
break
if not mac_failed:
socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'})
socketio.emit('script_finished', {'message': '檢驗 NFC 流程結束'})
@app.route('/api/test_nfc', methods=['POST'])
def api_test_nfc():
"""完整測試 NFC API"""
thread = threading.Thread(target=run_test_nfc)
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': '開始完整測試 NFC 流程'})
def run_test_nfc():
"""執行完整測試 NFC"""
keys_path = get_keys_path()
if not os.path.exists(keys_path):
socketio.emit('script_output', {'line': '找不到 keys.txt無法執行完整測試'})
socketio.emit('script_finished', {'message': '流程結束'})
return
with open(keys_path, 'r', encoding='utf-8') as f:
keys = [line.strip() for line in f if line.strip()]
exe_dir = dirname(abspath(nt4h_exe_path))
exe_file = basename(nt4h_exe_path)
test_script_path = os.path.join(exe_dir, 'test_ntag424.sh')
socketio.emit('script_output', {'line': '🔍 開始完整測試 NFC 功能...'})
# 步驟1: 讀取UID找到可用的KEY
socketio.emit('script_output', {'line': '🔄 步驟1: 讀取UID尋找可用KEY...'})
oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir)
if not oldkey:
socketio.emit('script_output', {'line': '❌ 未找到可用的 oldkey無法執行完整測試'})
socketio.emit('script_finished', {'message': '流程結束'})
return
socketio.emit('script_output', {'line': f'✅ 找到可用 oldkey{oldkey}'})
# 步驟2: 強制更換到key 1
socketio.emit('script_output', {'line': '🔄 步驟2: 強制更換到key 1...'})
cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', '1', '--old-key', oldkey, '--key-no', '0']
output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False)
if not success:
socketio.emit('script_output', {'line': '❌ 強制更換到key 1失敗無法執行完整測試'})
socketio.emit('script_finished', {'message': '流程結束'})
return
socketio.emit('script_output', {'line': '✅ 成功強制更換到key 1'})
# 步驟3: 執行測試腳本
if os.path.exists(test_script_path):
socketio.emit('script_output', {'line': '🔄 步驟3: 執行完整測試腳本...'})
socketio.emit('script_output', {'line': f'測試腳本路徑: {test_script_path}'})
# 執行測試腳本
test_cmd = ['bash', test_script_path]
test_output_lines, test_success = run_and_emit(test_cmd, exe_dir, success_keywords=['🎉 所有測試都通過了!'])
if test_success:
socketio.emit('script_output', {'line': '✅ NFC 完整測試完成!所有測試都通過了!'})
else:
socketio.emit('script_output', {'line': '⚠️ NFC 完整測試完成,但可能有問題'})
else:
socketio.emit('script_output', {'line': '❌ 找不到 test_ntag424.sh 測試腳本'})
socketio.emit('script_output', {'line': f'預期路徑: {test_script_path}'})
socketio.emit('script_finished', {'message': 'NFC 完整測試完成'})
@socketio.on('connect')
def ws_connect():
print(f"WebSocket 客戶端已連接: {request.sid}")
emit('connected', {'message': '已連接到伺服器'})
@socketio.on('disconnect')
def ws_disconnect():
print(f"WebSocket 客戶端已斷開: {request.sid}")
@socketio.on('error')
def ws_error(error):
print(f"WebSocket 錯誤: {error}")
if __name__ == '__main__':
print("NFC 生產工具與腳本執行監控整合版")
print("=" * 50)
#print(f"預設腳本路徑: {script_path}")
print("啟動網頁伺服器...")
print("請在瀏覽器中開啟: http://localhost:5000")
print("按 Ctrl+C 停止伺服器")
print("=" * 50)
socketio.run(app, host='0.0.0.0', port=5000, debug=False)