"""功能 python 获取window11电脑的屏幕的视频/音频流并保存为mp4格式文件 """ """安装核心依赖 & .venv/Scripts/python.exe -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade aiortc pywin32 numpy opencv-python sounddevice pycaw comtypes """ import os import asyncio import ctypes import numpy as np import win32gui import win32ui import win32con import win32api import sounddevice as sd from aiortc import MediaStreamTrack, RTCPeerConnection from aiortc.contrib.media import MediaRecorder # -------------------------- 1. Windows 11高DPI适配 -------------------------- ctypes.windll.shcore.SetProcessDpiAwareness(2) # 解决高分屏采集偏移问题 # -------------------------- 2. 兼容VideoFrame导入 -------------------------- try:from aiortc.contrib.media import VideoFrame,AudioFrame except ImportError:from aiortc import VideoFrame,AudioFrame # -------------------------- 2. 屏幕视频采集轨道 -------------------------- class ScreenVideoTrack(MediaStreamTrack): kind = "video" def __init__(self, fps=30, scale_factor=1.0): super().__init__() self.fps = fps self.scale_factor = scale_factor self.stop_flag = False self.pts = 0 self.frame_interval = 1.0 / fps # 获取真实屏幕分辨率(适配高DPI) self.user32 = ctypes.windll.user32 self.screen_width = self.user32.GetSystemMetrics(0) self.screen_height = self.user32.GetSystemMetrics(1) def capture_screen(self): """Windows 11原生API采集屏幕(低延迟)""" left, top, width, height = 0, 0, self.screen_width, self.screen_height # 1. 创建设备上下文(DC) hdesktop = win32gui.GetDesktopWindow() hwnd_dc = win32gui.GetWindowDC(hdesktop) mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc) save_dc = mfc_dc.CreateCompatibleDC() # 2. 复制屏幕内容到位图 save_bitmap = win32ui.CreateBitmap() save_bitmap.CreateCompatibleBitmap(mfc_dc, width, height) save_dc.SelectObject(save_bitmap) save_dc.BitBlt((0, 0), (width, height), mfc_dc, (left, top), win32con.SRCCOPY) # 3. 转换为numpy数组(BGR格式) bmp_data = save_bitmap.GetBitmapBits(True) frame = np.frombuffer(bmp_data, dtype=np.uint8).reshape((height, width, 4)) frame = frame[:, :, :3] # 去掉Alpha通道 frame = frame[:, :, ::-1] # BGRA → BGR(适配aiortc) # 4. 缩放(可选,降低文件大小) if self.scale_factor != 1.0: new_width = int(width * self.scale_factor) new_height = int(height * self.scale_factor) frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) # 5. 释放资源(避免内存泄漏) win32gui.DeleteObject(save_bitmap.GetHandle()) save_dc.DeleteDC() mfc_dc.DeleteDC() win32gui.ReleaseDC(hdesktop, hwnd_dc) return frame async def recv(self): """aiortc核心方法:持续返回视频帧""" if self.stop_flag: raise StopAsyncIteration # 控制帧率 await asyncio.sleep(self.frame_interval) # 获取windows11 原始的视频帧 frame_data = self.capture_screen() # 转换为aiortc VideoFrame video_frame = self._convert_to_video_frame(frame_data) video_frame.pts = int(self.pts) video_frame.time_base = np.array([1, self.fps]) self.pts += 1 return video_frame def _convert_to_video_frame(self, img): """将numpy数组转换为aiortc的VideoFrame""" return VideoFrame.from_ndarray(img, format="bgr24") def stop(self): """停止屏幕采集""" self.stop_flag = True # -------------------------- 3. 系统音频采集轨道 -------------------------- class SystemAudioTrack(MediaStreamTrack): kind = "audio" def __init__(self, sample_rate=48000, channels=2): super().__init__() self.sample_rate = sample_rate self.channels = channels self.stop_flag = False self.pts = 0 self.audio_queue = asyncio.Queue(maxsize=10) # 音频数据队列 # 启动麦克风/系统音频采集(这里默认采集麦克风,系统音频见备注) self._start_audio_capture() def _start_audio_capture(self): """启动音频采集(麦克风)""" def audio_callback(indata, frames, time, status): if status or self.stop_flag: return # 将音频数据转为float32格式(aiortc要求) audio_data = indata.astype(np.float32) self.audio_queue.put_nowait(audio_data) # 打开音频输入流 self.audio_stream = sd.InputStream( samplerate=self.sample_rate, channels=self.channels, callback=audio_callback, blocksize=1024 ) self.audio_stream.start() async def recv(self): """aiortc核心方法:持续返回音频帧""" if self.stop_flag and self.audio_queue.empty(): raise StopAsyncIteration # 从队列获取音频数据 audio_data = await self.audio_queue.get() sample_count = len(audio_data) # 转换为aiortc AudioFrame audio_frame = AudioFrame( samples=audio_data.T, # 转置为 (channels, samples) sample_rate=self.sample_rate, channels=self.channels ) audio_frame.pts = int(self.pts) audio_frame.time_base = np.array([1, self.sample_rate]) self.pts += sample_count return audio_frame def stop(self): """停止音频采集""" self.stop_flag = True self.audio_stream.stop() self.audio_stream.close() # -------------------------- 2. 工具函数:枚举音频设备(解决Error querying device -1错误) -------------------------- def list_audio_devices(): """枚举所有音频设备,输出ID和名称,方便手动选择""" print("📢 可用音频设备列表:") devices = sd.query_devices() for idx, dev in enumerate(devices): print(f"ID {idx}: {dev['name']} | 输入通道:{dev['max_input_channels']} | 输出通道:{dev['max_output_channels']}") return devices def get_vb_cable_device_id(): """查找VB-Cable虚拟音频设备ID(系统声音采集必需)""" devices = sd.query_devices() for idx, dev in enumerate(devices): if "Cable" in dev['name'] and dev['max_input_channels'] > 0: return idx raise Exception("❌ 未找到VB-Cable虚拟音频设备!\n 请先安装:https://vb-audio.com/Cable/") # -------------------------- 3. 系统声音录制为MP3 -------------------------- def record_system_audio_to_mp3(output_mp3="system_audio.mp3", duration=5, sample_rate=48000): """ 录制系统声音为MP3(需先安装VB-Cable并设置系统音频输出到VB-Cable) :param output_mp3: 输出MP3文件名 :param duration: 录制时长(秒) :param sample_rate: 采样率 """ # 修复device -1错误:手动指定VB-Cable设备ID vb_cable_id = get_vb_cable_device_id() print(f"✅ 已找到VB-Cable设备,ID:{vb_cable_id}") # 1. 录制音频为WAV(先录WAV再转MP3,避免编码问题) wav_file = "temp_audio.wav" print(f"🎙️ 开始录制系统声音({duration}秒)...") audio_data = sd.rec( int(duration * sample_rate), samplerate=sample_rate, channels=2, dtype='float32', device=vb_cable_id # 关键:指定有效设备ID,解决-1错误 ) sd.wait() # 等待录制完成 # 2. 保存为WAV(临时文件) with wave.open(wav_file, 'wb') as wf: wf.setnchannels(2) wf.setsampwidth(2) wf.setframerate(sample_rate) wf.writeframes((audio_data * 32767).astype(np.int16).tobytes()) # 3. WAV转MP3(用OpenCV/ffmpeg,无需额外依赖) print("🔄 转换WAV到MP3...") os.system(f"ffmpeg -y -i {wav_file} -codec:a libmp3lame -b:a 192k {output_mp3}") # 4. 删除临时WAV文件 if os.path.exists(wav_file): os.remove(wav_file) print(f"✅ 系统声音已保存:{output_mp3}") # -------------------------- 4. 屏幕60帧/秒截图(5秒)并合成MP4 -------------------------- def capture_screen_60fps(output_dir="screen_shots", duration=5, fps=60): """ 5秒内每秒60张屏幕截图(共300张),并合成高帧率MP4 :param output_dir: 截图保存目录 :param duration: 录制时长(秒) :param fps: 帧率(60帧/秒) """ # 1. 创建截图目录 if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"🖥️ 开始屏幕截图({duration}秒,{fps}帧/秒)...") # 2. 获取屏幕真实分辨率(适配高DPI) user32 = ctypes.windll.user32 screen_width = user32.GetSystemMetrics(0) screen_height = user32.GetSystemMetrics(1) total_frames = duration * fps # 总帧数:5*60=300帧 frame_interval = 1.0 / fps # 每帧间隔(约16.67ms) # 3. 循环截图(60帧/秒) for frame_idx in range(total_frames): # Windows原生API采集屏幕(低延迟,适配60帧) hdesktop = win32gui.GetDesktopWindow() hwnd_dc = win32gui.GetWindowDC(hdesktop) mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc) save_dc = mfc_dc.CreateCompatibleDC() save_bitmap = win32ui.CreateBitmap() save_bitmap.CreateCompatibleBitmap(mfc_dc, screen_width, screen_height) save_dc.SelectObject(save_bitmap) save_dc.BitBlt((0, 0), (screen_width, screen_height), mfc_dc, (0, 0), win32con.SRCCOPY) # 转换为numpy数组并保存为PNG(无损) bmp_data = save_bitmap.GetBitmapBits(True) frame = np.frombuffer(bmp_data, dtype=np.uint8).reshape((screen_height, screen_width, 4)) frame = frame[:, :, :3][:, :, ::-1] # BGRA→BGR # 保存截图(命名:frame_0001.png ~ frame_0300.png) frame_filename = os.path.join(output_dir, f"frame_{frame_idx+1:04d}.png") cv2.imwrite(frame_filename, frame, [cv2.IMWRITE_PNG_COMPRESSION, 0]) # 释放资源 win32gui.DeleteObject(save_bitmap.GetHandle()) save_dc.DeleteDC() mfc_dc.DeleteDC() win32gui.ReleaseDC(hdesktop, hwnd_dc) # 控制帧率(确保每秒60帧) asyncio.run( asyncio.sleep(frame_interval) ) # 进度提示 if (frame_idx + 1) % 60 == 0: print(f" 已截图:{frame_idx+1}/{total_frames}帧({int((frame_idx+1)/total_frames*100)}%)") # 4. 将截图合成60帧/秒的MP4 print("🔄 将截图合成60帧/秒MP4...") output_mp4 = "screen_60fps.mp4" # FFmpeg合成命令(H.264编码,60帧/秒) ffmpeg_cmd = ( f'ffmpeg -y -framerate {fps} -i {output_dir}/frame_%04d.png ' f'-c:v libx264 -r {fps} -pix_fmt yuv420p {output_mp4}' ) os.system(ffmpeg_cmd) print(f"✅ 60帧屏幕视频已保存:{output_mp4}") print(f"📁 原始截图保存在:{output_dir}") # -------------------------- 4. 主逻辑:采集并保存为MP4 -------------------------- async def record_screen_to_mp4(output_file="screen_record.mp4", duration=10): """ 录制屏幕+音频到MP4 :param output_file: 输出MP4文件名 :param duration: 录制时长(秒) """ # 1. 创建音视频轨道 video_track = ScreenVideoTrack(fps=30, scale_factor=0.8) # 0.8倍缩放,减小文件 audio_track = SystemAudioTrack(sample_rate=48000) # 2. 创建MediaRecorder(封装为MP4) recorder = MediaRecorder( output_file, format="mp4", # 指定输出格式 options={ "video_codec": "h264", # H.264编码,兼容性最好 "audio_codec": "aac", # AAC音频编码 "video_bitrate": "2000k" # 视频码率(可调整) } ) # 3. 添加音视频轨道到录制器 recorder.addTrack(video_track) recorder.addTrack(audio_track) # 4. 开始录制 print(f"✅ 开始录制屏幕,时长{duration}秒,输出文件:{output_file}") await recorder.start() # 5. 录制指定时长 await asyncio.sleep(duration) # 6. 停止录制并清理资源 print("🔚 录制结束,正在保存文件...") video_track.stop() audio_track.stop() await recorder.stop() print(f"✅ 文件已保存:{output_file}") # -------------------------- 5. 运行入口 -------------------------- if __name__ == "__main__": import cv2 # 延迟导入,避免启动报错 # 列出所有 音频设备 all_audio_devices = list_audio_devices() # 运行录制(录制10秒,输出screen_record.mp4) try: # asyncio.run(record_screen_to_mp4(output_file="screen_record.mp4", duration=10)) # 第二步:录制系统声音为MP3(5秒) if all_audio_devices: record_system_audio_to_mp3(output_mp3="system_audio.mp3", duration=5) # 第三步:屏幕60帧/秒截图(5秒)并合成MP4 capture_screen_60fps(output_dir="screen_shots", duration=5, fps=60) pass except KeyboardInterrupt: print("\n🛑 录制被手动终止") except Exception as e: print(f"❌ 录制出错:{str(e)}") print("💡 排查步骤:") print(" 1. 确认FFmpeg已添加到系统PATH") print(" 2. 确认依赖包已安装:pip install -r requirements.txt") print(" 3. 以管理员身份运行脚本") # -------------------------- 备注:系统音频采集(进阶) -------------------------- # 如需采集系统播放的音频(而非麦克风),需替换SystemAudioTrack为以下逻辑: # 1. 安装虚拟音频线(VB-Cable):https://vb-audio.com/Cable/ # 2. 将系统音频输出设置为VB-Cable # 3. 将SystemAudioTrack的设备ID指定为VB-Cable的输入设备ID # 示例: # def _start_audio_capture(self): # # 列出所有音频设备,找到VB-Cable的ID # devices = sd.query_devices() # cable_id = None # for idx, dev in enumerate(devices): # if "Cable" in dev['name'] and dev['max_input_channels'] > 0: # cable_id = idx # break # if cable_id is None: # raise Exception("未找到VB-Cable虚拟音频设备") # # 用VB-Cable ID启动采集 # self.audio_stream = sd.InputStream( # device=cable_id, # samplerate=self.sample_rate, # channels=self.channels, # callback=self.audio_callback, # blocksize=1024 # ) # self.audio_stream.start()