#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 整合 NFC 生產工具與腳本執行監控 Flask + Flask-SocketIO """ # 必須在導入其他模組之前執行 eventlet.monkey_patch() import eventlet eventlet.monkey_patch() import os import subprocess import threading import queue import re from datetime import datetime from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit from os.path import dirname, basename, abspath import json import shutil import urllib.parse import requests app = Flask(__name__) app.config['SECRET_KEY'] = 'nfc_web_controller_secret_key' socketio = SocketIO(app, cors_allowed_origins="*") exe_dir = 'linux64_release' CONFIG_FILE = 'config.json' def load_config(): """載入設定檔""" if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {'api_url': ''} def save_config(config): """儲存設定檔""" with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) # 載入設定 config = load_config() def get_keys_path(): return os.path.join(exe_dir, 'keys.txt') def get_urls_path(): return os.path.join(exe_dir, 'urls.txt') def get_default_exe_path(): return os.path.join(exe_dir, 'nt4h_c_example') nt4h_exe_path = get_default_exe_path() @app.route('/') def index(): return render_template('index.html') @app.route('/api/set_exe_dir', methods=['POST']) def api_set_exe_dir(): global exe_dir, nt4h_exe_path data = request.get_json() new_dir = data.get('exe_dir') if not new_dir or not os.path.isdir(new_dir): return jsonify({'success': False, 'message': '資料夾不存在'}), 400 exe_dir = new_dir # 跟著自動切換執行檔路徑 nt4h_exe_path = get_default_exe_path() return jsonify({'success': True, 'exe_dir': exe_dir, 'exe_path': nt4h_exe_path}) @app.route('/api/keys') def api_keys(): keys_path = get_keys_path() if not os.path.exists(keys_path): return jsonify({'success': False, 'message': '找不到 keys.txt'}), 404 with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] return jsonify({'success': True, 'keys': keys}) @app.route('/api/urls') def api_urls(): urls_path = get_urls_path() if not os.path.exists(urls_path): return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404 with open(urls_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] return jsonify({'success': True, 'urls': urls}) @app.route('/api/get_url_by_index', methods=['POST']) def api_get_url_by_index(): """根據索引獲取特定 URL""" data = request.get_json() index = data.get('index') if not index or not index.isdigit(): return jsonify({'success': False, 'message': '無效的索引'}), 400 urls_path = get_urls_path() if not os.path.exists(urls_path): return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404 with open(urls_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] idx = int(index) - 1 # 轉換為 0-based 索引 if idx < 0 or idx >= len(urls): return jsonify({'success': False, 'message': '索引超出範圍'}), 400 return jsonify({'success': True, 'url': urls[idx], 'index': index}) @app.route('/api/update_url_by_index', methods=['POST']) def api_update_url_by_index(): """更新特定索引的 URL""" data = request.get_json() index = data.get('index') new_url = data.get('url', '').strip() if not index or not index.isdigit(): return jsonify({'success': False, 'message': '無效的索引'}), 400 if not new_url: return jsonify({'success': False, 'message': 'URL 不得為空'}), 400 urls_path = get_urls_path() if not os.path.exists(urls_path): return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404 # 讀取所有 URL with open(urls_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] idx = int(index) - 1 # 轉換為 0-based 索引 if idx < 0 or idx >= len(urls): return jsonify({'success': False, 'message': '索引超出範圍'}), 400 # 更新 URL urls[idx] = new_url # 寫回檔案 try: with open(urls_path, 'w', encoding='utf-8') as f: for url in urls: f.write(url + '\n') return jsonify({'success': True, 'message': 'URL 更新成功'}) except Exception as e: return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500 @app.route('/api/add_url', methods=['POST']) def api_add_url(): """添加新的 URL""" data = request.get_json() new_url = data.get('url', '').strip() if not new_url: return jsonify({'success': False, 'message': 'URL 不得為空'}), 400 urls_path = get_urls_path() if not os.path.exists(urls_path): return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404 # 讀取現有 URL with open(urls_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] # 添加新 URL urls.append(new_url) # 寫回檔案 try: with open(urls_path, 'w', encoding='utf-8') as f: for url in urls: f.write(url + '\n') return jsonify({'success': True, 'message': 'URL 添加成功', 'new_index': len(urls)}) except Exception as e: return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500 @app.route('/api/delete_url_by_index', methods=['POST']) def api_delete_url_by_index(): """刪除特定索引的 URL""" data = request.get_json() index = data.get('index') if not index or not index.isdigit(): return jsonify({'success': False, 'message': '無效的索引'}), 400 urls_path = get_urls_path() if not os.path.exists(urls_path): return jsonify({'success': False, 'message': '找不到 urls.txt'}), 404 # 讀取所有 URL with open(urls_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] idx = int(index) - 1 # 轉換為 0-based 索引 if idx < 0 or idx >= len(urls): return jsonify({'success': False, 'message': '索引超出範圍'}), 400 # 刪除 URL deleted_url = urls.pop(idx) # 寫回檔案 try: with open(urls_path, 'w', encoding='utf-8') as f: for url in urls: f.write(url + '\n') return jsonify({'success': True, 'message': 'URL 刪除成功', 'deleted_url': deleted_url}) except Exception as e: return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500 @app.route('/api/save_urls_file', methods=['POST']) def api_save_urls_file(): """保存整個 URLs 文件內容""" data = request.get_json() urls_content = data.get('urls_content', '') urls_path = get_urls_path() try: # 將內容按行分割,過濾空行 urls = [line.strip() for line in urls_content.split('\n') if line.strip()] # 寫入檔案 with open(urls_path, 'w', encoding='utf-8') as f: for url in urls: f.write(url + '\n') return jsonify({'success': True, 'message': 'URLs 文件保存成功', 'urls_count': len(urls)}) except Exception as e: return jsonify({'success': False, 'message': f'寫入檔案失敗: {e}'}), 500 @app.route('/api/set_exe_path', methods=['POST']) def api_set_exe_path(): global nt4h_exe_path data = request.get_json() exe_path = data.get('exe_path') if not exe_path or not os.path.isfile(exe_path): return jsonify({'success': False, 'message': '檔案不存在'}), 400 nt4h_exe_path = exe_path return jsonify({'success': True, 'exe_path': nt4h_exe_path}) @app.route('/api/set_api_url', methods=['POST']) def api_set_api_url(): global config data = request.get_json() api_url = data.get('api_url') if not api_url: return jsonify({'success': False, 'message': '未提供 API URL'}), 400 config['api_url'] = api_url save_config(config) return jsonify({'success': True, 'api_url': api_url}) @app.route('/api/set_api_token', methods=['POST']) def api_set_api_token(): global config data = request.get_json() api_token = data.get('api_token') if not api_token: return jsonify({'success': False, 'message': '未提供 API Token'}), 400 config['api_token'] = api_token save_config(config) return jsonify({'success': True, 'message': 'API Token 設定成功'}) @app.route('/api/get_api_token') def api_get_api_token(): return jsonify({'api_token': config.get('api_token', '')}) @app.route('/api/get_api_url') def api_get_api_url(): return jsonify({'api_url': config.get('api_url', '')}) @app.route('/api/get_name') def api_get_name(): return jsonify({'name': config.get('name', '')}) @app.route('/api/set_name', methods=['POST']) def api_set_name(): data = request.get_json() name = data.get('name', '') config['name'] = name save_config(config) return jsonify({'success': True, 'name': name}) @app.route('/api/get_names', methods=['GET']) def api_get_names(): names = config.get('names', []) current = config.get('name', '') return jsonify({'names': names, 'current': current}) @app.route('/api/add_name', methods=['POST']) def api_add_name(): data = request.get_json() name = data.get('name', '').strip() if not name: return jsonify({'success': False, 'message': '名稱不得為空'}), 400 names = config.get('names', []) if name not in names: names.append(name) config['names'] = names # 不自動設定為當前選擇,保持空白 save_config(config) return jsonify({'success': True, 'names': names, 'current': ''}) @app.route('/api/delete_name', methods=['POST']) def api_delete_name(): data = request.get_json() name = data.get('name', '').strip() if not name: return jsonify({'success': False, 'message': '名稱不得為空'}), 400 names = config.get('names', []) if name in names: names.remove(name) config['names'] = names # 如果刪除的是當前選中的名稱,清空當前選擇 if config.get('name') == name: config['name'] = '' save_config(config) return jsonify({'success': True, 'names': names, 'current': config.get('name', '')}) else: return jsonify({'success': False, 'message': '名稱不存在'}), 400 def run_and_emit(cmd, exe_dir, success_keywords=None, stop_on_success=False): """ 執行指令並即時推送結果到前端。 若 success_keywords 有設定,遇到任一關鍵字就回傳 True(可用於 early stop)。 """ socketio.emit('script_output', {'line': f'$ {" ".join(cmd)}'}) try: import os import select import sys # 設定環境變數確保即時輸出 env = os.environ.copy() env['PYTHONUNBUFFERED'] = '1' env['PYTHONIOENCODING'] = 'utf-8' # 使用 pty 來獲得更真實的終端體驗 try: import pty master, slave = pty.openpty() proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=slave, stderr=slave, stdin=slave, env=env, preexec_fn=os.setsid) os.close(slave) output_lines = [] success = False # 使用 select 來非阻塞讀取 while True: try: # 檢查進程是否還在運行 if proc.poll() is not None: break # 使用 select 等待數據 ready, _, _ = select.select([master], [], [], 0.1) if ready: data = os.read(master, 1024).decode('utf-8', errors='ignore') if data: # 按行分割並發送 lines = data.split('\n') for i, line in enumerate(lines): if i < len(lines) - 1: # 完整的行 clean_line = line.rstrip('\r') if clean_line: # 只發送非空行 socketio.emit('script_output', {'line': clean_line}) output_lines.append(clean_line) # 檢查成功關鍵字 if success_keywords and any(kw in clean_line for kw in success_keywords): success = True if stop_on_success: proc.terminate() break else: # 最後一行可能不完整 if line.strip(): # 保存到緩衝區,等待更多數據 pass if success and stop_on_success: break except OSError: break # 讀取剩餘的輸出 try: remaining = os.read(master, 4096).decode('utf-8', errors='ignore') if remaining: for line in remaining.split('\n'): clean_line = line.rstrip('\r') if clean_line: socketio.emit('script_output', {'line': clean_line}) output_lines.append(clean_line) except OSError: pass os.close(master) proc.wait() except ImportError: # 如果 pty 不可用,回退到標準方法 proc = subprocess.Popen(cmd, cwd=exe_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, env=env) output_lines = [] success = False for line in iter(proc.stdout.readline, ''): if line: clean_line = line.rstrip('\r\n') socketio.emit('script_output', {'line': clean_line}) output_lines.append(clean_line) if success_keywords and any(kw in clean_line for kw in success_keywords): success = True if stop_on_success: break proc.wait() return output_lines, success except Exception as e: socketio.emit('script_output', {'line': f'執行指令時發生錯誤: {e}'}) return [], False def find_valid_oldkey(keys, exe_file, exe_dir): """ 依序 getuid --key <行號>,遇到成功就停下,回傳 (oldkey行號, output_lines) """ for idx in range(len(keys)): key_no = str(idx+1) cmd = [f'./{exe_file}', 'getuid', '--key', key_no] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False) if success: socketio.emit('script_output', {'line': f'找到可用 oldkey:第{key_no}行'}) return key_no, output_lines return None, [] @app.route('/api/run', methods=['POST']) def api_run(): data = request.get_json() key = data.get('key') url = data.get('url') name = data.get('name', '') if not key: return jsonify({'success': False, 'message': '未選擇 key'}), 400 # 儲存名稱到 config if name: config['name'] = name save_config(config) thread = threading.Thread(target=run_nfc_produce, args=(key, url, name)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始執行生產新SDM流程'}) def run_nfc_produce(newkey, url, name=''): keys_path = get_keys_path() if not os.path.exists(keys_path): socketio.emit('script_output', {'line': '找不到 keys.txt,無法生產NFC'}) socketio.emit('script_finished', {'message': '流程結束'}) return with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir) if not oldkey: socketio.emit('script_output', {'line': '未找到可用的 oldkey,流程結束'}) socketio.emit('script_finished', {'message': '流程結束'}) return # 執行 changekey socketio.emit('script_output', {'line': '🔄 開始變更金鑰...'}) cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', newkey, '--old-key', oldkey, '--key-no', '0'] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False) if not success: socketio.emit('script_output', {'line': '❌ 變更金鑰失敗,流程結束'}) socketio.emit('script_finished', {'message': '流程結束'}) return socketio.emit('script_output', {'line': '✅ 成功變更金鑰!'}) # 執行 setsdm socketio.emit('script_output', {'line': '🔄 開始設定 SDM...'}) cmd = [f'./{exe_file}', 'setsdm', '--url', url, '--key', newkey] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['快速 SDM 設定完成', 'SDM 設定完成'], stop_on_success=False) if not success: socketio.emit('script_output', {'line': '❌ 設定 SDM 失敗,流程結束'}) socketio.emit('script_finished', {'message': '流程結束'}) return socketio.emit('script_output', {'line': '✅ 成功設定 SDM!'}) # 執行 verify socketio.emit('script_output', {'line': '🔄 開始 CMAC 認證...'}) cmd = [f'./{exe_file}', 'verify', '--key', newkey] output_lines, success = run_and_emit(cmd, exe_dir) # 解析 verify 結果 uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, newkey, keys) if uid and sdm_ctr and cmac: socketio.emit('script_output', {'line': f'✅ CMAC 認證成功!'}) socketio.emit('script_output', {'line': f'UID: {uid}'}) socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'}) socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'}) # POST 到 API post_to_api(uid, sdm_ctr, cmac, newkey, name) else: socketio.emit('script_output', {'line': '❌ CMAC 認證失敗,無法解析結果'}) socketio.emit('script_finished', {'message': '生產新SDM流程結束'}) @app.route('/api/read_nfc', methods=['POST']) def api_read_nfc(): thread = threading.Thread(target=read_nfc) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始批次讀取所有NFC'}) def read_nfc(): keys_path = get_keys_path() if not os.path.exists(keys_path): socketio.emit('script_output', {'line': '找不到 keys.txt,無法批次讀取NFC'}) socketio.emit('script_finished', {'message': '讀取結束'}) return with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] exe_dir_path = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) for idx in range(len(keys)): key_no = str(idx+1) cmd = [f'./{exe_file}', 'getuid', '--key', key_no] output_lines, success = run_and_emit(cmd, exe_dir_path, success_keywords=['成功讀取 UID', '=== UID 讀取完成 ==='], stop_on_success=False) if success: socketio.emit('script_output', {'line': f'成功讀取到 UID,停止讀取'}) break socketio.emit('script_finished', {'message': '讀取結束'}) @app.route('/api/verify', methods=['POST']) def api_verify(): data = request.get_json() key = data.get('key') name = data.get('name', '') if not key: return jsonify({'success': False, 'message': '未選擇 key'}), 400 # 儲存名稱到 config if name: config['name'] = name save_config(config) thread = threading.Thread(target=verify_nfc, args=(key, name)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始 CMAC 認證流程'}) def parse_verify_output(output_lines, key_no, keys): uid = None sdm_ctr = None cmac = None for line in output_lines: if 'UID:' in line: uid = line.split('UID:')[1].strip() elif 'SDM 讀取計數器:' in line: sdm_ctr = line.split('SDM 讀取計數器:')[1].strip() elif 'ASCII MAC 資料:' in line: cmac = line.split('ASCII MAC 資料:')[1].strip() key_value = keys[int(key_no)-1] if key_no and key_no.isdigit() and 0 < int(key_no) <= len(keys) else '' return uid, sdm_ctr, cmac, key_value def post_to_api(uid, sdm_ctr, cmac, key_index, name=''): """將 UID、SDM 計數器、CMAC 資料 POST 到指定的 API URL""" api_url = config.get('api_url', '') if not api_url: socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL,跳過POST請求'}) return # 檢查是否有 API Token api_token = config.get('api_token', '') if not api_token: socketio.emit('script_output', {'line': '⚠️ 未設定 API Token,跳過POST請求'}) return # 構建 POST 請求的 URL post_url = f"{api_url}/nfc/insertForProduction" # 準備 JSON 資料 json_data = { 'uid': uid, 'ctr': sdm_ctr, 'key': key_index } # 準備請求標頭,包含 API Token headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } socketio.emit('script_output', {'line': f'🔄 正在發送資料到 API: {post_url}'}) socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'}) socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'}) try: # 發送 POST 請求,包含 API Token response = requests.post(post_url, json=json_data, headers=headers, timeout=30) socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'}) if response.status_code == 200: socketio.emit('script_output', {'line': '✅ 資料成功發送到 API'}) try: # 檢查回應內容是否為空 if response.text and response.text.strip(): response_data = response.json() socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'}) else: socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'}) except json.JSONDecodeError as e: socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'}) except Exception as e: socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'}) socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'}) elif response.status_code == 401: socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) elif response.status_code == 500: socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) else: socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) except requests.exceptions.Timeout: socketio.emit('script_output', {'line': '❌ API 請求超時'}) except requests.exceptions.ConnectionError: socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'}) except Exception as e: socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'}) def verify_nfc(key_no, name=''): keys_path = get_keys_path() if not os.path.exists(keys_path): socketio.emit('script_output', {'line': '找不到 keys.txt,無法認證'}) socketio.emit('script_finished', {'message': '流程結束'}) return with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) # 執行 CMAC 認證 cmd = [f'./{exe_file}', 'verify', '--key', key_no] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False) # 檢查本地認證結果 if success: socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'}) # 解析結果 uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys) if uid and sdm_ctr and cmac: socketio.emit('script_output', {'line': f'UID: {uid}'}) socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'}) socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'}) # 進行遠端認證 verify_remote(uid, sdm_ctr, cmac, key_no) else: socketio.emit('script_output', {'line': '❌ 無法解析認證結果'}) else: # 檢查是否有 MAC 驗證失敗的錯誤碼 mac_failed = False for line in output_lines: if 'MAC 驗證失敗' in line and '0x' in line: socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'}) mac_failed = True break if not mac_failed: socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'}) socketio.emit('script_finished', {'message': 'CMAC 認證流程結束'}) def verify_remote(uid, sdm_ctr, cmac, key_no): """進行遠端 CMAC 認證""" verify_url = f"https://verify.contree.app/verify?uid={uid}&ctr={sdm_ctr}&cmac={cmac}&key={key_no}" socketio.emit('script_output', {'line': '🔄 開始遠端認證...'}) socketio.emit('script_output', {'line': f'認證 URL: {verify_url}'}) try: # 發送 GET 請求到遠端認證服務 response = requests.get(verify_url, timeout=30) socketio.emit('script_output', {'line': f'📡 遠端認證回應狀態碼: {response.status_code}'}) if response.status_code == 200: try: # 檢查回應內容是否為空 if response.text and response.text.strip(): response_data = response.json() if response_data.get('success') == True: socketio.emit('script_output', {'line': '✅ 線上認證成功!'}) socketio.emit('script_output', {'line': f'📄 認證訊息: {response_data.get("message", "")}'}) socketio.emit('script_output', {'line': f'📄 認證時間: {response_data.get("timestamp", "")}'}) # 顯示詳細資訊 details = response_data.get('details', {}) if details: socketio.emit('script_output', {'line': f'📄 認證詳情:'}) socketio.emit('script_output', {'line': f' UID: {details.get("uid", "")}'}) socketio.emit('script_output', {'line': f' 計數器: {details.get("counter", "")}'}) socketio.emit('script_output', {'line': f' CMAC: {details.get("cmac", "")}'}) socketio.emit('script_output', {'line': f' 金鑰索引: {details.get("key_index", "")}'}) else: socketio.emit('script_output', {'line': '❌ 線上認證失敗'}) socketio.emit('script_output', {'line': f'📄 錯誤訊息: {response_data.get("message", "")}'}) else: socketio.emit('script_output', {'line': '📄 遠端認證回應: (空回應)'}) except json.JSONDecodeError as e: socketio.emit('script_output', {'line': f'📄 遠端認證回應 (非JSON格式): {response.text}'}) except Exception as e: socketio.emit('script_output', {'line': f'📄 遠端認證回應解析錯誤: {str(e)}'}) socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'}) else: socketio.emit('script_output', {'line': f'❌ 遠端認證請求失敗,狀態碼: {response.status_code}'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) except requests.exceptions.Timeout: socketio.emit('script_output', {'line': '❌ 遠端認證請求超時'}) except requests.exceptions.ConnectionError: socketio.emit('script_output', {'line': '❌ 無法連接到遠端認證服務'}) except Exception as e: socketio.emit('script_output', {'line': f'❌ 遠端認證發生錯誤: {str(e)}'}) def post_to_verify_api(uid, name=''): """將 UID POST 到資料 API 進行檢驗""" api_url = config.get('api_url', '') if not api_url: socketio.emit('script_output', {'line': '⚠️ 未設定資料API URL,跳過POST請求'}) return # 檢查是否有 API Token api_token = config.get('api_token', '') if not api_token: socketio.emit('script_output', {'line': '⚠️ 未設定 API Token,跳過POST請求'}) return # 構建 POST 請求的 URL post_url = f"{api_url}/nfc/verifyForProduction" # 準備 JSON 資料 json_data = { 'uid': uid } # 準備請求標頭,包含 API Token headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } socketio.emit('script_output', {'line': f'🔄 正在發送檢驗資料到 API: {post_url}'}) socketio.emit('script_output', {'line': f'JSON 資料: {json.dumps(json_data, ensure_ascii=False)}'}) socketio.emit('script_output', {'line': f'🔐 使用 API Token: {api_token[:20]}...'}) try: # 發送 POST 請求,包含 API Token response = requests.post(post_url, json=json_data, headers=headers, timeout=30) socketio.emit('script_output', {'line': f'📡 API 回應狀態碼: {response.status_code}'}) if response.status_code == 200: socketio.emit('script_output', {'line': '✅ 檢驗資料成功發送到 API'}) try: # 檢查回應內容是否為空 if response.text and response.text.strip(): response_data = response.json() socketio.emit('script_output', {'line': f'📄 API 回應: {json.dumps(response_data, ensure_ascii=False)}'}) else: socketio.emit('script_output', {'line': '📄 API 回應: (空回應)'}) except json.JSONDecodeError as e: socketio.emit('script_output', {'line': f'📄 API 回應 (非JSON格式): {response.text}'}) except Exception as e: socketio.emit('script_output', {'line': f'📄 API 回應解析錯誤: {str(e)}'}) socketio.emit('script_output', {'line': f'📄 原始回應: {response.text}'}) elif response.status_code == 401: socketio.emit('script_output', {'line': '❌ API 認證失敗,請檢查 API Token 是否正確'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) elif response.status_code == 500: socketio.emit('script_output', {'line': '❌ 伺服器內部錯誤 (500)'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) else: socketio.emit('script_output', {'line': f'❌ API 請求失敗,狀態碼: {response.status_code}'}) socketio.emit('script_output', {'line': f'📄 錯誤回應: {response.text if response.text else "(空回應)"}'}) except requests.exceptions.Timeout: socketio.emit('script_output', {'line': '❌ API 請求超時'}) except requests.exceptions.ConnectionError: socketio.emit('script_output', {'line': '❌ 無法連接到 API 伺服器'}) except Exception as e: socketio.emit('script_output', {'line': f'❌ API 請求發生錯誤: {str(e)}'}) # ------------------ 手動測試功能 ------------------ @app.route('/manual_test') def manual_test_page(): return render_template('manual_test.html') @app.route('/api/manual_verify', methods=['POST']) def api_manual_verify(): """手動驗證 API""" data = request.get_json() uid = data.get('uid', '').strip() ctr = data.get('ctr', '').strip() cmac = data.get('cmac', '').strip() key_index = data.get('key_index', '').strip() url_index = data.get('url_index', '').strip() full_url = data.get('full_url', '').strip() quiet_mode = data.get('quiet_mode', False) if not uid or not ctr or not cmac: return jsonify({'success': False, 'message': 'UID、計數器和 CMAC 為必填欄位'}), 400 thread = threading.Thread(target=run_manual_verify, args=(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始手動驗證流程'}) def run_manual_verify(uid, ctr, cmac, key_index, url_index, full_url, quiet_mode): """執行手動驗證""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'verify', '--manual', '--uid', uid, '--ctr', ctr, '--cmac', cmac] if key_index: cmd.extend(['--key', key_index]) if url_index: cmd.extend(['--url-index', url_index]) if full_url: cmd = [f'./{exe_file}', 'verify', '--manual', '--url', full_url] if key_index: cmd.extend(['--key', key_index]) if quiet_mode: cmd.append('--quiet') socketio.emit('script_output', {'line': f'執行手動驗證: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '手動驗證完成'}) @app.route('/api/manual_getuid', methods=['POST']) def api_manual_getuid(): """手動讀取 UID API""" data = request.get_json() key_index = data.get('key_index', '').strip() quiet_mode = data.get('quiet_mode', False) thread = threading.Thread(target=run_manual_getuid, args=(key_index, quiet_mode)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始讀取 UID 流程'}) def run_manual_getuid(key_index, quiet_mode): """執行手動讀取 UID""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'getuid'] if key_index: cmd.extend(['--key', key_index]) if quiet_mode: cmd.append('--quiet') socketio.emit('script_output', {'line': f'執行讀取 UID: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '讀取 UID 完成'}) @app.route('/api/manual_setsdm', methods=['POST']) def api_manual_setsdm(): """手動設定 SDM API""" data = request.get_json() url_index = data.get('url_index', '').strip() key_index = data.get('key_index', '').strip() quiet_mode = data.get('quiet_mode', False) if not url_index: return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400 thread = threading.Thread(target=run_manual_setsdm, args=(url_index, key_index, quiet_mode)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始設定 SDM 流程'}) def run_manual_setsdm(url_index, key_index, quiet_mode): """執行手動設定 SDM""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'setsdm', '--url', url_index] if key_index: cmd.extend(['--key', key_index]) if quiet_mode: cmd.append('--quiet') socketio.emit('script_output', {'line': f'執行設定 SDM: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '設定 SDM 完成'}) @app.route('/api/manual_changekey', methods=['POST']) def api_manual_changekey(): """手動變更金鑰 API""" data = request.get_json() auth_key = data.get('auth_key', '').strip() new_key = data.get('new_key', '').strip() old_key = data.get('old_key', '').strip() key_no = data.get('key_no', '0').strip() quiet_mode = data.get('quiet_mode', False) if not auth_key or not new_key or not old_key: return jsonify({'success': False, 'message': '認證金鑰、新金鑰和舊金鑰為必填欄位'}), 400 thread = threading.Thread(target=run_manual_changekey, args=(auth_key, new_key, old_key, key_no, quiet_mode)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始變更金鑰流程'}) def run_manual_changekey(auth_key, new_key, old_key, key_no, quiet_mode): """執行手動變更金鑰""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'changekey', '--auth-key', auth_key, '--new-key', new_key, '--old-key', old_key, '--key-no', key_no] if quiet_mode: cmd.append('--quiet') socketio.emit('script_output', {'line': f'執行變更金鑰: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '變更金鑰完成'}) @app.route('/api/manual_writendef', methods=['POST']) def api_manual_writendef(): """手動寫入 NDEF API""" data = request.get_json() url_index = data.get('url_index', '').strip() if not url_index: return jsonify({'success': False, 'message': 'URL 索引為必填欄位'}), 400 thread = threading.Thread(target=run_manual_writendef, args=(url_index,)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始寫入 NDEF 流程'}) def run_manual_writendef(url_index): """執行手動寫入 NDEF""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'writendef', '--url-index', url_index] socketio.emit('script_output', {'line': f'執行寫入 NDEF: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '寫入 NDEF 完成'}) @app.route('/api/manual_readndef', methods=['POST']) def api_manual_readndef(): """手動讀取 NDEF API""" thread = threading.Thread(target=run_manual_readndef) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始讀取 NDEF 流程'}) def run_manual_readndef(): """執行手動讀取 NDEF""" exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) cmd = [f'./{exe_file}', 'readndef'] socketio.emit('script_output', {'line': f'執行讀取 NDEF: {" ".join(cmd)}'}) output_lines, success = run_and_emit(cmd, exe_dir) socketio.emit('script_finished', {'message': '讀取 NDEF 完成'}) @app.route('/api/manual_custom_command', methods=['POST']) def api_manual_custom_command(): """手動自定義指令 API""" data = request.get_json() command = data.get('command', '').strip() description = data.get('description', '').strip() if not command: return jsonify({'success': False, 'message': '指令不得為空'}), 400 # 基本安全檢查:確保指令以 ./nt4h_c_example 開頭 if not command.startswith('./nt4h_c_example'): return jsonify({'success': False, 'message': '指令必須以 ./nt4h_c_example 開頭'}), 400 thread = threading.Thread(target=run_manual_custom_command, args=(command, description)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始執行自定義指令'}) def run_manual_custom_command(command, description=''): """執行手動自定義指令""" # 使用絕對路徑 exe_dir = abspath('linux64_release') # 解析指令 cmd_parts = command.split() # 移除開頭的 ./nt4h_c_example,使用完整路徑 if cmd_parts[0] == './nt4h_c_example': cmd_parts[0] = abspath('linux64_release/nt4h_c_example') else: # 如果不是以 ./nt4h_c_example 開頭,直接使用完整路徑 cmd_parts[0] = abspath('linux64_release/nt4h_c_example') socketio.emit('script_output', {'line': f'執行自定義指令: {" ".join(cmd_parts)}'}) if description: socketio.emit('script_output', {'line': f'指令說明: {description}'}) output_lines, success = run_and_emit(cmd_parts, exe_dir) socketio.emit('script_finished', {'message': '自定義指令執行完成'}) @app.route('/api/check_nfc', methods=['POST']) def api_check_nfc(): """檢驗 NFC API""" data = request.get_json() key = data.get('key') name = data.get('name', '') if not key: return jsonify({'success': False, 'message': '未選擇 key'}), 400 # 儲存名稱到 config if name: config['name'] = name save_config(config) thread = threading.Thread(target=run_check_nfc, args=(key, name)) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始檢驗 NFC 流程'}) def run_check_nfc(key_no, name=''): """執行檢驗 NFC""" keys_path = get_keys_path() if not os.path.exists(keys_path): socketio.emit('script_output', {'line': '找不到 keys.txt,無法檢驗NFC'}) socketio.emit('script_finished', {'message': '流程結束'}) return with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) # 執行本地 CMAC 認證 socketio.emit('script_output', {'line': '🔍 開始檢驗 NFC...'}) cmd = [f'./{exe_file}', 'verify', '--key', key_no] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['MAC 驗證成功'], stop_on_success=False) # 檢查本地認證結果 if success: socketio.emit('script_output', {'line': f'✅ 本地 CMAC 認證成功!'}) # 解析結果 uid, sdm_ctr, cmac, key_value = parse_verify_output(output_lines, key_no, keys) if uid: socketio.emit('script_output', {'line': f'UID: {uid}'}) socketio.emit('script_output', {'line': f'SDM 讀取計數器: {sdm_ctr}'}) socketio.emit('script_output', {'line': f'ASCII MAC 資料: {cmac}'}) # POST 到資料 API post_to_verify_api(uid, name) else: socketio.emit('script_output', {'line': '❌ 無法解析 UID'}) else: # 檢查是否有 MAC 驗證失敗的錯誤碼 mac_failed = False for line in output_lines: if 'MAC 驗證失敗' in line and '0x' in line: socketio.emit('script_output', {'line': f'❌ 本地 CMAC 認證失敗: {line.strip()}'}) mac_failed = True break if not mac_failed: socketio.emit('script_output', {'line': '❌ 本地 CMAC 認證失敗'}) socketio.emit('script_finished', {'message': '檢驗 NFC 流程結束'}) @app.route('/api/test_nfc', methods=['POST']) def api_test_nfc(): """完整測試 NFC API""" thread = threading.Thread(target=run_test_nfc) thread.daemon = True thread.start() return jsonify({'success': True, 'message': '開始完整測試 NFC 流程'}) def run_test_nfc(): """執行完整測試 NFC""" keys_path = get_keys_path() if not os.path.exists(keys_path): socketio.emit('script_output', {'line': '找不到 keys.txt,無法執行完整測試'}) socketio.emit('script_finished', {'message': '流程結束'}) return with open(keys_path, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f if line.strip()] exe_dir = dirname(abspath(nt4h_exe_path)) exe_file = basename(nt4h_exe_path) test_script_path = os.path.join(exe_dir, 'test_ntag424.sh') socketio.emit('script_output', {'line': '🔍 開始完整測試 NFC 功能...'}) # 步驟1: 讀取UID找到可用的KEY socketio.emit('script_output', {'line': '🔄 步驟1: 讀取UID尋找可用KEY...'}) oldkey, _ = find_valid_oldkey(keys, exe_file, exe_dir) if not oldkey: socketio.emit('script_output', {'line': '❌ 未找到可用的 oldkey,無法執行完整測試'}) socketio.emit('script_finished', {'message': '流程結束'}) return socketio.emit('script_output', {'line': f'✅ 找到可用 oldkey:第{oldkey}行'}) # 步驟2: 強制更換到key 1 socketio.emit('script_output', {'line': '🔄 步驟2: 強制更換到key 1...'}) cmd = [f'./{exe_file}', 'changekey', '--auth-key', oldkey, '--new-key', '1', '--old-key', oldkey, '--key-no', '0'] output_lines, success = run_and_emit(cmd, exe_dir, success_keywords=['成功變更金鑰', '金鑰變更完成'], stop_on_success=False) if not success: socketio.emit('script_output', {'line': '❌ 強制更換到key 1失敗,無法執行完整測試'}) socketio.emit('script_finished', {'message': '流程結束'}) return socketio.emit('script_output', {'line': '✅ 成功強制更換到key 1!'}) # 步驟3: 執行測試腳本 if os.path.exists(test_script_path): socketio.emit('script_output', {'line': '🔄 步驟3: 執行完整測試腳本...'}) socketio.emit('script_output', {'line': f'測試腳本路徑: {test_script_path}'}) # 執行測試腳本 test_cmd = ['bash', test_script_path] test_output_lines, test_success = run_and_emit(test_cmd, exe_dir, success_keywords=['🎉 所有測試都通過了!']) if test_success: socketio.emit('script_output', {'line': '✅ NFC 完整測試完成!所有測試都通過了!'}) else: socketio.emit('script_output', {'line': '⚠️ NFC 完整測試完成,但可能有問題'}) else: socketio.emit('script_output', {'line': '❌ 找不到 test_ntag424.sh 測試腳本'}) socketio.emit('script_output', {'line': f'預期路徑: {test_script_path}'}) socketio.emit('script_finished', {'message': 'NFC 完整測試完成'}) @socketio.on('connect') def ws_connect(): print(f"WebSocket 客戶端已連接: {request.sid}") emit('connected', {'message': '已連接到伺服器'}) @socketio.on('disconnect') def ws_disconnect(): print(f"WebSocket 客戶端已斷開: {request.sid}") @socketio.on('error') def ws_error(error): print(f"WebSocket 錯誤: {error}") if __name__ == '__main__': print("NFC 生產工具與腳本執行監控整合版") print("=" * 50) #print(f"預設腳本路徑: {script_path}") print("啟動網頁伺服器...") print("請在瀏覽器中開啟: http://localhost:5000") print("按 Ctrl+C 停止伺服器") print("=" * 50) socketio.run(app, host='0.0.0.0', port=5000, debug=False)