最近某些项目需要用到人体姿势识别。当前有很多著名的人体识别的开源项目,比如CMU的OpenPose,上交的AlphaPose,效果都很好。我昨天试着安装一下AlphaPose,配环境配了一天,终于可以运行Demo的时候,显存溢出。。。因此我换个思路,我的项目不要求实时性,使用API也是可以接受的。发现百度的人体识别这方面的API还不错,QPS限制为2,也能用,于是便有了这篇文章。
本文通过调用百度开放的人体关键点检测API,实现关键点检测并将其在Python的轻量级GUI库Tkinter绘制出来。当然最直接的方法是直接用opencv显示出来,但是我们更多的时候需要进行交互,因此使用Tkinter。
一、百度API获取
若想使用百度的API,需要在百度AI的官网上注册帐号,然后在人体识别功能下创建一个应用,即可得到
APP_ID、API_KEY、SECRET_KEY,这样就可以调用它的API了。调用方式可以参考其API文档https://ai.baidu.com/ai-doc/BODY/0k3cpyxme
若想使用Python调用,首先要安装百度的API接口模块:
pip install baidu-aip
二、人体关键点获取和绘制
可以通过下面的类调用百度API
class BaiduAIP(object): def __init__(self): self.client = AipBodyAnalysis(cfg.APP_ID, cfg.API_KEY, cfg.SECRET_KEY) def bodyAnalysis(self,img_jpg): etval, buffer = cv2.imencode('.jpg', img_jpg) result = self.client.bodyAnalysis(buffer) #内部把buffer转换为base64了 return result
然后是绘制关键点和连线和方框的函数:
def draw_line(img,dic,text): color=(0,255,0) thickness=2 if(text=='warn'): color=(0,0,255) #nose ---> neck cv2.line(img, (int(dic['nose']['x']),int(dic['nose']['y'])),(int(dic['neck']['x']),int(dic['neck']['y'])), color, thickness) #neck --> left_shoulder cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['left_shoulder']['x']),int(dic['left_shoulder']['y'])), color, thickness) #neck --> right_shoulder cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['right_shoulder']['x']),int(dic['right_shoulder']['y'])), color, thickness) #left_shoulder --> left_elbow cv2.line(img, (int(dic['left_shoulder']['x']),int(dic['left_shoulder']['y'])),(int(dic['left_elbow']['x']),int(dic['left_elbow']['y'])), color, thickness) #left_elbow --> left_wrist cv2.line(img, (int(dic['left_elbow']['x']),int(dic['left_elbow']['y'])),(int(dic['left_wrist']['x']),int(dic['left_wrist']['y'])), color, thickness) #right_shoulder --> right_elbow cv2.line(img, (int(dic['right_shoulder']['x']),int(dic['right_shoulder']['y'])),(int(dic['right_elbow']['x']),int(dic['right_elbow']['y'])), color, thickness) #right_elbow --> right_wrist cv2.line(img, (int(dic['right_elbow']['x']),int(dic['right_elbow']['y'])),(int(dic['right_wrist']['x']),int(dic['right_wrist']['y'])), color, thickness) #neck --> left_hip cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['left_hip']['x']),int(dic['left_hip']['y'])), color, thickness) #neck --> right_hip cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['right_hip']['x']),int(dic['right_hip']['y'])), color, thickness) #left_hip --> left_knee cv2.line(img, (int(dic['left_hip']['x']),int(dic['left_hip']['y'])),(int(dic['left_knee']['x']),int(dic['left_knee']['y'])), color, thickness) #right_hip --> right_knee cv2.line(img, (int(dic['right_hip']['x']),int(dic['right_hip']['y'])),(int(dic['right_knee']['x']),int(dic['right_knee']['y'])), color, thickness) #left_knee --> left_ankle cv2.line(img, (int(dic['left_knee']['x']),int(dic['left_knee']['y'])),(int(dic['left_ankle']['x']),int(dic['left_ankle']['y'])), color, thickness) #right_knee --> right_ankle cv2.line(img, (int(dic['right_knee']['x']),int(dic['right_knee']['y'])),(int(dic['right_ankle']['x']),int(dic['right_ankle']['y'])), color, thickness) def draw_point(img,dic,text): color=(0,255,0) thickness=2 if(text=='warn'): color=(0,0,255) for i in dic: cv2.circle(img,(int(dic[i]['x']),int(dic[i]['y'])),5,color,thickness) def draw_box(img,dic,text): color=(255,0,0) if(text=='warn'): color=(0,0,255) left_top=(int(dic['left']),int(dic['top'])) left_bottom=(int(dic['left']),int(dic['top']+dic['height'])) right_bottom=(int(dic['left']+dic['width']),int(dic['top']+dic['height'])) right_top=(int(dic['left']+dic['width']),int(dic['top'])) cv2.line(img, left_top,left_bottom, color, 2) cv2.line(img, left_top,right_top, color, 2) cv2.line(img, right_bottom,left_bottom,color, 2) cv2.line(img, right_bottom,right_top,color, 2) cv2.putText(img, text, (int(dic['left']),int(dic['top'])+20), cv2.FONT_HERSHEY_COMPLEX, 1,color, 1)
测试一下:
if __name__ == '__main__': baiduapi = BaiduAIP() img = cv2.imread('/media/chen/chen/Photo/others/littleCookies.jpg') d=baiduapi.bodyAnalysis(img) print("api time= "+str(t2-t1)) print(d["person_num"]) print(d["log_id"]) persion=d["person_info"] for p in persion: draw_line(img,p['body_parts'],'ok') draw_point(img,p['body_parts'],'ok') draw_box(img,p['location'],'beauty') cv2.imwrite("image1.jpg",img)
结果:
三、tkinter界面开发
先定义窗口和组件:
IMG_HEIGT=352*2 IMG_WIDTH=640*2 #根窗口 window = tk.Tk() #显示变量 num_people = StringVar() num_people.set('当前人数:0') num_desertion = StringVar() num_desertion.set('开小差:0') #UI绘制 window.title("智能课堂管理") sw = window.winfo_screenwidth()#获取屏幕宽 sh = window.winfo_screenheight()#获取屏幕高 #设置窗口大小和位置 wx = IMG_WIDTH+100 wh = IMG_HEIGT+100 window.geometry("%dx%d+%d+%d" %(wx,wh,(sw-wx)/2,(sh-wh)/2-100))#窗口至指定位置 #顶部是信息栏 fm1 = Frame(window) Label(fm1, text="总人数:20").pack(side='left') label_num = Label(fm1, textvariable=num_people).pack(side='left') label_num = Label(fm1, textvariable=num_desertion).pack(side='left') fm1.pack(side='top', padx=10) #左侧是操作栏 fm2 = Frame(window) fm2.pack(side='left', padx=10) canvas1 = tk.Canvas(window,bg="#c4c2c2",height=IMG_HEIGT,width=IMG_WIDTH)#绘制画布 canvas1.pack(side="left") job=ImgProcThread() bt_start = tk.Button(fm2,text="启动",height=2,width=15,command=job.start).pack(side="top") bt_pause = tk.Button(fm2,text="暂停",height=2,width=15,command=job.pause).pack(side="top") bt_resume = tk.Button(fm2,text="恢复",height=2,width=15,command=job.resume).pack(side="top") bt_stop = tk.Button(fm2,text="结束线程",height=2,width=15,command=job.stop).pack(side="top") bt_quit = tk.Button(fm2,text="退出程序",height=2,width=15,command=window.quit).pack(side="top") window.mainloop()
ImgProcThread是什么?由于tkiner中没有提供直接播放视频的组件,因此我们使用cavas显示图片,并将关键点检测和图片检测放到另一个线程。但是由于Python自带的线程类功能功能不足,因此需要自己实现线程阻塞、结束等功能,如下:
class ImgProcThread(threading.Thread): def __init__(self, *args, **kwargs): super(ImgProcThread, self).__init__(*args, **kwargs) self.__flag = threading.Event() # 用于暂停线程的标识 self.__flag.set() # 设置为True self.__running = threading.Event() # 用于停止线程的标识 self.__running.set() # 将running设置为True def run(self): capture = cv2.VideoCapture('/media/chen/chen/Photo/others/1.mp4') while self.__running.isSet() and capture.isOpened(): self.__flag.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回 pass def pause(self): self.__flag.clear() # 设置为False, 让线程阻塞 def resume(self): self.__flag.set() # 设置为True, 让线程停止阻塞 def stop(self): self.__flag.set() # 将线程从暂停状态恢复, 如何已经暂停的话 self.__running.clear() # 设置为Fals
四、总程序
本程序是根据我自己的需求来写的,因此会有一些在别人看起来蜜汁操作的代码,大家可以略过。
config.py
APP_ID="***" API_KEY="***" SECRET_KEY="***"
aip_bodyanalysis.py
from aip import AipBodyAnalysis import sys if('/opt/ros/kinetic/lib/python2.7/dist-packages' in sys.path): sys.path.remove('/opt/ros/kinetic/lib/python2.7/dist-packages') import cv2 import os import config as cfg import base64 import time def pose_analyse(img,dic): nose=(int(dic['nose']['x']),int(dic['nose']['y'])) neck=(int(dic['neck']['x']),int(dic['neck']['y'])) if(neck[1]<=nose[1]): return 'warn' return 'ok' def draw_line(img,dic,text): color=(0,255,0) thickness=2 if(text=='warn'): color=(0,0,255) #nose ---> neck cv2.line(img, (int(dic['nose']['x']),int(dic['nose']['y'])),(int(dic['neck']['x']),int(dic['neck']['y'])), color, thickness) #neck --> left_shoulder cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['left_shoulder']['x']),int(dic['left_shoulder']['y'])), color, thickness) #neck --> right_shoulder cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['right_shoulder']['x']),int(dic['right_shoulder']['y'])), color, thickness) #left_shoulder --> left_elbow cv2.line(img, (int(dic['left_shoulder']['x']),int(dic['left_shoulder']['y'])),(int(dic['left_elbow']['x']),int(dic['left_elbow']['y'])), color, thickness) #left_elbow --> left_wrist cv2.line(img, (int(dic['left_elbow']['x']),int(dic['left_elbow']['y'])),(int(dic['left_wrist']['x']),int(dic['left_wrist']['y'])), color, thickness) #right_shoulder --> right_elbow cv2.line(img, (int(dic['right_shoulder']['x']),int(dic['right_shoulder']['y'])),(int(dic['right_elbow']['x']),int(dic['right_elbow']['y'])), color, thickness) #right_elbow --> right_wrist cv2.line(img, (int(dic['right_elbow']['x']),int(dic['right_elbow']['y'])),(int(dic['right_wrist']['x']),int(dic['right_wrist']['y'])), color, thickness) #neck --> left_hip cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['left_hip']['x']),int(dic['left_hip']['y'])), color, thickness) #neck --> right_hip cv2.line(img, (int(dic['neck']['x']),int(dic['neck']['y'])),(int(dic['right_hip']['x']),int(dic['right_hip']['y'])), color, thickness) #left_hip --> left_knee cv2.line(img, (int(dic['left_hip']['x']),int(dic['left_hip']['y'])),(int(dic['left_knee']['x']),int(dic['left_knee']['y'])), color, thickness) #right_hip --> right_knee cv2.line(img, (int(dic['right_hip']['x']),int(dic['right_hip']['y'])),(int(dic['right_knee']['x']),int(dic['right_knee']['y'])), color, thickness) #left_knee --> left_ankle cv2.line(img, (int(dic['left_knee']['x']),int(dic['left_knee']['y'])),(int(dic['left_ankle']['x']),int(dic['left_ankle']['y'])), color, thickness) #right_knee --> right_ankle cv2.line(img, (int(dic['right_knee']['x']),int(dic['right_knee']['y'])),(int(dic['right_ankle']['x']),int(dic['right_ankle']['y'])), color, thickness) def draw_point(img,dic,text): color=(0,255,0) thickness=2 if(text=='warn'): color=(0,0,255) for i in dic: cv2.circle(img,(int(dic[i]['x']),int(dic[i]['y'])),5,color,thickness) def draw_box(img,dic,text): color=(255,0,0) if(text=='warn'): color=(0,0,255) left_top=(int(dic['left']),int(dic['top'])) left_bottom=(int(dic['left']),int(dic['top']+dic['height'])) right_bottom=(int(dic['left']+dic['width']),int(dic['top']+dic['height'])) right_top=(int(dic['left']+dic['width']),int(dic['top'])) cv2.line(img, left_top,left_bottom, color, 2) cv2.line(img, left_top,right_top, color, 2) cv2.line(img, right_bottom,left_bottom,color, 2) cv2.line(img, right_bottom,right_top,color, 2) cv2.putText(img, text, (int(dic['left']),int(dic['top'])+20), cv2.FONT_HERSHEY_COMPLEX, 1,color, 1) class BaiduAIP(object): def __init__(self): self.client = AipBodyAnalysis(cfg.APP_ID, cfg.API_KEY, cfg.SECRET_KEY) def bodyAnalysis(self,img_jpg): etval, buffer = cv2.imencode('.jpg', img_jpg) result = self.client.bodyAnalysis(buffer) #内部把buffer转换为base64了 return result if __name__ == '__main__': baiduapi = BaiduAIP() img = cv2.imread('/media/chen/chen/Photo/others/littleCookies.jpg') t1=time.time() d=baiduapi.bodyAnalysis(img) t2=time.time() print("api time= "+str(t2-t1)) print(d["person_num"]) print(d["log_id"]) persion=d["person_info"] for p in persion: draw_line(img,p['body_parts'],'ok') draw_point(img,p['body_parts'],'ok') draw_box(img,p['location'],'beauty') t3=time.time() print("draw time= "+str(t3-t2)) cv2.imwrite("image1.jpg",img)
tkinter_body.py
import sys if('/opt/ros/kinetic/lib/python2.7/dist-packages' in sys.path): sys.path.remove('/opt/ros/kinetic/lib/python2.7/dist-packages') import cv2 import tkinter as tk from tkinter import *#文件控件 from PIL import Image, ImageTk#图像控件 import threading#多线程 from aip_bodyanalysis import * import time IMG_HEIGT=352*2 IMG_WIDTH=640*2 aip=BaiduAIP() is_run=True #申请线程锁 lock=threading.Lock() #根窗口 window = tk.Tk() #显示变量 num_people = StringVar() num_people.set('当前人数:0') num_desertion = StringVar() num_desertion.set('不正常:0') #UI绘制 window.title("测试") sw = window.winfo_screenwidth()#获取屏幕宽 sh = window.winfo_screenheight()#获取屏幕高 #设置窗口大小和位置 wx = IMG_WIDTH+100 wh = IMG_HEIGT+100 window.geometry("%dx%d+%d+%d" %(wx,wh,(sw-wx)/2,(sh-wh)/2-100))#窗口至指定位置 #顶部是信息栏 fm1 = Frame(window) Label(fm1, text="总人数:20").pack(side='left') label_num = Label(fm1, textvariable=num_people).pack(side='left') label_num = Label(fm1, textvariable=num_desertion).pack(side='left') fm1.pack(side='top', padx=10) #左侧是操作栏 fm2 = Frame(window) fm2.pack(side='left', padx=10) canvas1 = tk.Canvas(window,bg="#c4c2c2",height=IMG_HEIGT,width=IMG_WIDTH)#绘制画布 canvas1.pack(side="left") def getResult(img): d=aip.bodyAnalysis(img) if("person_info" not in d): return persion=d["person_info"] #获取人数 num=len(persion) num_people.set( "当前人数:"+str(num)) #绘制人体姿势 for p in persion: status=pose_analyse(img,p['body_parts']) draw_line(img,p['body_parts'],status) draw_point(img,p['body_parts'],status) draw_box(img,p['location'],status) def cc(): capture = cv2.VideoCapture('/media/chen/chen/Photo/others/1.mp4') #capture = cv2.VideoCapture(0) while capture.isOpened(): t0=time.time() _, frame = capture.read() #清除缓存 for i in range(15): _, frame = capture.read() getResult(frame) frame = cv2.flip(frame, 1)#翻转 0:上下颠倒 大于0水平颠倒 cv2image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA) img = Image.fromarray(cv2image) image_file=ImageTk.PhotoImage(img) canvas1.create_image(0,0,anchor="nw",image=image_file) print(time.time()-t0) print("ending") class ImgProcThread(threading.Thread): def __init__(self, *args, **kwargs): super(ImgProcThread, self).__init__(*args, **kwargs) self.__flag = threading.Event() # 用于暂停线程的标识 self.__flag.set() # 设置为True self.__running = threading.Event() # 用于停止线程的标识 self.__running.set() # 将running设置为True def run(self): capture = cv2.VideoCapture('/media/chen/chen/Photo/others/1.mp4') while self.__running.isSet() and capture.isOpened(): self.__flag.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回 t0=time.time() _, frame = capture.read() for i in range(15): _, frame = capture.read() frame = cv2.resize(frame,(IMG_WIDTH,IMG_HEIGT), interpolation=cv2.INTER_NEAREST) getResult(frame) #frame = cv2.flip(frame, 1)#翻转 0:上下颠倒 大于0水平颠倒 cv2image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA) img = Image.fromarray(cv2image) image_file=ImageTk.PhotoImage(img) canvas1.create_image(0,0,anchor="nw",image=image_file) print(time.time()-t0) def pause(self): self.__flag.clear() # 设置为False, 让线程阻塞 def resume(self): self.__flag.set() # 设置为True, 让线程停止阻塞 def stop(self): self.__flag.set() # 将线程从暂停状态恢复, 如何已经暂停的话 self.__running.clear() # 设置为Fals def video_demo(): t=threading.Thread(target=cc) t.start() job=ImgProcThread() bt_start = tk.Button(fm2,text="启动",height=2,width=15,command=job.start).pack(side="top") bt_pause = tk.Button(fm2,text="暂停",height=2,width=15,command=job.pause).pack(side="top") bt_resume = tk.Button(fm2,text="恢复",height=2,width=15,command=job.resume).pack(side="top") bt_stop = tk.Button(fm2,text="结束线程",height=2,width=15,command=job.stop).pack(side="top") bt_quit = tk.Button(fm2,text="退出程序",height=2,width=15,command=window.quit).pack(side="top") window.mainloop()
代码执行结果: