以下是一个示例代码,演示了如何在按下控制台按键时中断线程运行:
import threading
import time
import signal
import sys
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
self.stop_flag = threading.Event()
def run(self):
while not self.stop_flag.is_set():
# 线程的主要逻辑
print("线程运行中...")
time.sleep(1)
def stop(self):
self.stop_flag.set()
def signal_handler(signal, frame):
# 中断信号处理函数
print("中断信号接收到,停止线程运行")
thread.stop()
sys.exit(0)
if __name__ == "__main__":
# 创建线程对象
thread = MyThread()
# 注册中断信号处理函数
signal.signal(signal.SIGINT, signal_handler)
# 启动线程
thread.start()
# 等待用户按下控制台按键
input("按下任意键中断线程运行...")
# 停止线程
thread.stop()
# 等待线程结束
thread.join()
print("线程已停止运行")
在上面的示例中,创建了一个继承自Thread
的自定义线程类MyThread
。该线程类包含一个stop_flag
事件,用于控制线程是否停止运行。在线程的run
方法中,通过检查stop_flag
事件是否被设置来决定线程是否继续运行。
在主程序中,首先创建了一个MyThread
对象,并注册了一个中断信号处理函数signal_handler
。然后启动线程,并使用input
函数等待用户按下任意键。当用户按下控制台按键时,触发了中断信号,中断信号处理函数将调用线程的stop
方法来停止线程的运行。
最后,程序等待线程结束并输出相关信息。
注意:这里使用了signal
模块来处理中断信号,这是一个在Unix系统上较为常用的方法。在Windows系统上,可能需要使用其他方式处理中断信号。
下一篇:按照空值优先的降序排列