怎么学习pythonn游戏如何解决游戏操作中产生的不便

使用Python写一个小游戏
&更新时间:日 16:10:22 & 作者:人工智能的秘密
这篇文章主要介绍了使用Python快速写一个小游戏,本次开发的小游戏叫alien invasion,具体实现过程大家参考下本文
最近python语言大火,除了在科学计算领域python有用武之地之外,在游戏、后台等方面,python也大放异彩,本篇博文将按照正规的项目开发流程,手把手教大家写个python小游戏,来感受下其中的有趣之处。本次开发的游戏叫做alien invasion。
安装pygame并创建能左右移动的飞船
安装pygame
本人电脑是windows 10、python3.6,pygame下载地址:
请自行下载对应python版本的pygame 运行以下命令
$ pip install wheel
$ pip install pygame&#.3‑cp36‑cp36m‑win_amd64.whl
创建Pygame窗口及响应用户输入
新建一个文件夹alien_invasion,并在文件夹中新建alien_invasion.py文件,输入如下代码。
import sys
import pygame
def run_game():
#initialize game and create a dispaly object
pygame.init()
screen = pygame.display.set_mode(())
pygame.display.set_caption("Alien Invasion")
# set backgroud color
bg_color = (230,230,230)
# game loop
while True:
# supervise keyboard and mouse item
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit()
# fill color
screen.fill(bg_color)
# visualiaze the window
pygame.display.flip()
run_game()
运行上述代码,我们可以得到一个灰色界面的窗口:
$ python alien_invasion.py
创建设置类
为了在写游戏的过程中能便捷地创建一些新功能,下面额外编写一个settings模块,其中包含一个Settings类,用于将所有设置存储在一个地方。这样在以后项目增大时修改游戏的外观就更加容易。 我们首先将alien_invasion.py中的显示屏大小及显示屏颜色进行修改。 首先在alien_invasion文件夹下新建python文件settings.py,并向其中添加如下代码:
class Settings(object):
"""docstring for Settings"""
def __init__(self):
# initialize setting of game
# screen setting
self.screen_width = 1200
self.screen_height = 800
self.bg_color = (230,230,230)
然后再alien_invasion.py中导入Settings类,并使用相关设置,修改如下:
import sys
import pygame
from settings import Settings
def run_game():
#initialize game and create a dispaly object
pygame.init()
ai_settings = Settings()
screen = pygame.display.set_mode((ai_settings.screen_width,ai_settings.screen_height))
pygame.display.set_caption("Alien Invasion")
# set backgroud color
bg_color = (230,230,230)
# game loop
while True:
# supervise keyboard and mouse item
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit()
# fill color
screen.fill(ai_settings.bg_color)
# visualiaze the window
pygame.display.flip()
run_game()
添加飞船图像
接下来,我们需要将飞船加入游戏中。为了在屏幕上绘制玩家的飞船,我们将加载一幅图像,再使用Pygame()方法blit()绘制它。 在游戏中几乎可以使用各种类型的图像文件,但是使用位图(.bmp)文件最为简单,这是因为Pygame默认加载位图。虽然其他类型的图像也能加载,但是需要安装额外的库。我们推荐去免费的图片素材网站上去找图像: 传送门 。我们在主项目文件夹(alien_invasion)中新建一个文件夹叫images,将如下bmp图片放入其中。
接下来,我们创建飞船类ship.py:
import pygame
class Ship():
def __init__(self,screen):
#initialize spaceship and its location
self.screen = screen
# load bmp image and get rectangle
self.image = pygame.image.load('image/ship.bmp')
self.rect = self.image.get_rect()
self.screen_rect = screen.get_rect()
#put spaceship on the bottom of window
self.rect.centerx = self.screen_rect.centerx
self.rect.bottom = self.screen_rect.bottom
def blitme(self):
#buld the spaceship at the specific location
self.screen.blit(self.image,self.rect)
最后我们在屏幕上绘制飞船,即在alien_invasion.py文件中调用blitme方法:
import sys
import pygame
from settings import Settings
from ship import Settings
def run_game():
#initialize game and create a dispaly object
pygame.init()
ai_settings = Settings()
screen = pygame.display.set_mode((ai_settings.screen_width,ai_settings.screen_height))
ship = Ship(screen)
pygame.display.set_caption("Alien Invasion")
# set backgroud color
bg_color = (230,230,230)
# game loop
while True:
# supervise keyboard and mouse item
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit()
# fill color
screen.fill(ai_settings.bg_color)
ship.blitme()
# visualiaze the window
pygame.display.flip()
run_game()
重构:模块game_functions
在大型项目中,经常需要在添加新代码前重构既有代码。重构的目的是为了简化代码的结构,使其更加容易扩展。我们将实现一个game_functions模块,它将存储大量让游戏Alien invasion运行的函数。通过创建模块game_functions,可避免alien_invasion.py太长,使其逻辑更容易理解。
函数check_events()
首先我们将管理事件的代码移到一个名为check_events()的函数中,目的是为了隔离事件循环
import sys
import pygame
def check_events():
#respond to keyboard and mouse item
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit()
然后我们修改alien_invasion.py代码,导入game_functions模块,并将事件循环替换成对函数check_events()的调用:
import sys
import pygame
from settings import Settings
from ship import Ship
import game_functions as gf
def run_game():
#initialize game and create a dispaly object
pygame.init()
ai_settings = Settings()
以上所述是小编给大家介绍的Python写一个小游戏,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具本帖已被设为精华帖!
最近在TesterHome游戏测试群里,有时候会看到有童鞋问,游戏测试人员学了Python,可以干点什么。
很多童鞋初学Python,学习了语法和基础类库后,开始迷茫如何实际使用到工作中去,其实Python可以做的事情是很多的,将日常工作的一些事情自动化,对我们的工作效率有很大的提升。
本文面向Py新手,分享一些辅助工作的小工具思路。以下例子都是在Win10 + Py3.5下完成。
subprocess是Python自带的子进程管理模块,定义有数个创建子进程的函数,也提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
简单理解就是,你通过CMD敲的命令,都基本可以用subprocess来实现批量处理。
例子1:批量SVN操作
以更新SVN为例,这是一个频繁的操作,尤其是多个SVN目录需要一一更新的时候,手动起来是挺麻烦的。
import subprocesssubprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\策划文档" /closeonend:0')subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\配置文档" /closeonend:0')
例子2:adb命令的封装
做安卓手游测试的时候,adb是常用工具,我们可以通过它,进行apk的安装,卸载,截图,获取APK信息,性能数据,获取手机信息等等操作。
比如获取当前运行在前台的apk的package和activity名称
def run_cmd(cmd):
"""执行CMD命令"""
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
return [i.decode() for i in p.communicate()[0].splitlines()]def get_apk_info():
"""获取apk的package,activity名称
:return: list
eg ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
result = run_cmd("adb shell dumpsys activity top")
for line in result:
if line.strip().startswith('ACTIVITY'):
return line.split()[1].split('/')print(get_apk_info())output: ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
比如查看当前apk的内存占用
def get_mem_using(package_name=None):
"""查看apk的内存占用
:param package_name:
:return: 单位KB
if not package_name:
package_name = get_apk_info()[0]
result = run_cmd("adb shell dumpsys meminfo {}".format(package_name))
info = re.search('TOTAL\W+\d+', str(result)).group()
mem = info.split()
except Exception as e:
print(info)
return mem[-1]output: 37769
比如备份当前apk到桌面
def backup_current_apk(path=r"C:\Users\jianbing\Desktop\apks"):
package = get_apk_info()[0]
result = run_cmd("adb shell pm path {}".format(package))
cmd = "adb pull {} {}".format(result[0].split(":")[-1], os.path.join(path, "{}.apk".format(package)))
print(cmd)
run_cmd(cmd)
再进一步,将常用的adb操作封装为一个ADB工具类。社区里也有童鞋之前分享过,。
例子:在整个文件夹中搜索关键字
某天策划说,这个版本他删掉了某个道具,让我检查下有没有删漏的地方,这个道具产出的地方不少,最佳的检查方式是各个相关配置表看下还有没有配置这个道具。
那就写个脚本遍历整个文件夹来搜索指定关键字吧。
import osdef get_files_by_suffix(path, suffixes=("txt", "xml"), traverse=True):
"""从path路径下,找出全部指定后缀名的文件
:param path: 根目录
:param suffixes: 指定查找的文件后缀名
:param traverse: 如果为False,只遍历一层目录
file_list = []
for root, dirs, files in os.walk(path):
for file in files:
file_suffix = os.path.splitext(file)[1][1:].lower()
if file_suffix in suffixes:
file_list.append(os.path.join(root, file))
if not traverse:
return file_list
return file_listif __name__ == '__main__':
keyword = "XXX宝箱"
files = get_files_by_suffix(r"C:\project\config")
for file in files:
with open(file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().lower()
position = content.find(keyword.lower())
if position != -1:
print("Find in {0}".format(file))
start = position - 100 if position - 100 & 0 else 0
end = position + 100 if position + 100 & len(content) else len(content)
print(content[start:end])
print("_" * 100) 操作远程服务器
例子1:查看内网发版时间
有时候问开发,最近一次内网服务端发版是什么时候?开发回答:有点忘记了。。那就得自力更生了~
手动方式:使用FTP软件连入内网服务器,查看文件的更新日期,从而知道发版时间。
懒人方式:Py大法好~
paramiko是Python很有名的第三方库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。
import paramikoimport time_transport = paramiko.Transport("192.168.1.10:22")_transport.connect(username="root", password="XXXXXX")sftp = paramiko.SFTPClient.from_transport(_transport)result = sftp.listdir_attr("/data/www/sg/sg_dev/socket/conf/config/treasure")print("发版时间是:{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result[0].st_mtime))))sftp.close()
例子2:查看内网报错信息
在进行测试的时候,需要多留意服务端是否有新的报错信息,有些报错在客户端并没有什么表现,比如数据进库失败,手动方式:通过SecureCRT连入内网服务器,CD到Log目录下,然后tail -n 200 sg_error.log 查看最新的报错信息。
于是萌生了写一个小工具来定时检测,发现报错信息就保存起来的想法。
import datetimeimport paramikoimport timeimport osclass ScanError(object):
def __init__(self):
self._ssh = paramiko.SSHClient()
self.last_error_log = None
self._init()
def _init(self):
os.chdir("data")
# 打算将报错信息保存到data目录下
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect("192.168.1.10", username="root", password="XXXXXX")
error_log = self.get_error_log(500)
self.last_error_log = error_log
# 检测最近三天有没有报错信息
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
the_day_before_yesterday = today - datetime.timedelta(days=2)
error_log_str = "\n".join(error_log)
if error_log_str.find(str(today)) & -1 or error_log_str.find(str(yesterday)) & -1 or error_log_str.find(str(the_day_before_yesterday)) & -1:
self.save_error_log("error.txt", error_log)
print('内网最近三天有错误信息,请查看')
os.popen('error.txt')
def get_error_log(self, num=200):
cmd = 'cd /data/www/sg/sg_dev/socket/log&&tail -n {} sg_error.log'.format(num)
stdin, stdout, stderr = self._ssh.exec_command(cmd)
error_log = [i.decode("utf-8") for i in stdout.read().splitlines() if i]
return error_log
@staticmethod
def save_error_log(file_name, log: list):
with open(file_name, 'w', encoding='utf-8') as f:
f.write("\n".join(log))
def run_forever(self, interval=30, show_error=True):
"""运行检测工具
:param interval: 检测间隔
:param show_error: 是否检测到报错就自动弹出显示
time.sleep(interval)
error_log = self.get_error_log()
if error_log != self.last_error_log and "\n".join(set(error_log) - set(self.last_error_log)).find("ERROR") & -1:
self.last_error_log = error_log
file_name = time.strftime("%Y-%m-%d-%H-%M-%S.txt", time.localtime(time.time()))
print('{} 检测到内网有新的错误信息'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
self.save_error_log(file_name, error_log)
if show_error:
os.popen(file_name)if __name__ == '__main__':
ScanError().run_forever() 数据库操作
例子:找号功能
内网的服务器建了N多的号,有时候看着排行榜某个帐号,想登录看下数据,可以使用Python写一个连接数据库的找号脚本。
import cymysqlplayer_name = "XXXXXX"conn = cymysql.connect(host='XXXXXX', user='sg', passwd='XXXXXX', db="dev", charset='utf8')cur = conn.cursor()sql = "select * from Player where name like '%{0}%'".format(player_name)
# 模糊搜索,从玩家名称搜索玩家IDcur.execute(sql)for r in cur.fetchall():
sql = "select * from Account where uid = '{0}'".format(r[0])
# 从玩家ID搜索玩家帐号
cur.execute(sql)
for row in cur.fetchall():
print('{0}, {1}, {2}'.format(r[0], r[1], row[2]))
# 打印相关信息conn.close() 扩展开发提供的工具
在之前某个项目,开发做了一个给游戏帐号发道具的网页,提供测试使用,操作流程是这样的,在网页上的表单里边,填写玩家的ID,在下拉列表选中要发送的道具(支持模糊搜索),填写数量。
这个网页使用起来,工作效率不高的地方就是,每次添加道具,都需要重新选择道具和填写数量,且添加过程没有记录下来,无法复用。
优化方案,在网页上点击添加道具,其实就是网页给游戏服务器发送了一个HTTP请求,那就直接让Python来代劳吧~
import requestsplayer_id = server_ip = "192.168.1.21:5000"data = {"stuffList": []}url = "http://{}/api/{}/stuff".format(server_ip, player_id)data["stuffList"].append({"itemID": , "number": 1000})
# itemID为的物品,数量1000data["stuffList"].append({"itemID": , "number": 1000})data["stuffList"].append({"itemID": , "number": 1000})data["stuffList"].append({"itemID": , "number": 1000})data["stuffList"].append({"itemID": , "number": 1000})data["stuffList"].append({"itemID": , "number": 1000})requests.post(url, json=data, timeout=5)
# 添加道具# requests.delete(url, json=data, timeout=5)
# 删除道具
有没有发现,每个脚本都很简短~
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
将本帖设为了精华贴
加精理由:赞想法 虽然技术不算很牛但是实用 读起来赏心悦目
不错的分享,有不少指导意义
例子2:查看内网报错信息的脚本中,
def save_error_log(file_name, log: list):
参数 log: list
是个什么写法?
Py3.5新增的类型提示,具体可以看
不错 拜读了 受益匪浅
不错 正好在学习这块
顶一个,慢慢吸收
name == 'main'
请问下name需要自己声明吗
__name__ 是表示当前模块名字的变量,当模块被直接运行时模块名为__main__ 。这句话的意思就是,当模块被直接运行时,以下代码块将被运行,当模块是被导入时,代码块不被运行。
好的,谢谢
[该话题已被删除]
中提及了此贴
楼主你好,"adb命令的封装"例子,不知道如何改才能在mac os系统下运行,通过报错来找解决的方法。终端输入
$ python XXXXXXXXX/python_test/ex36.py 出现如下报错。根据OSError: [Errno 2] No such file or directory,应该是没找到对应文件。
Traceback (most recent call last):
File "/Users/liruiyi/PycharmProjects/python_test/ex36.py", line 22, in
print(get_apk_info())
File "/Users/liruiyi/PycharmProjects/python_test/ex36.py", line 17, in get_apk_info
result = run_cmd("adb shell dumpsys activity top")
File "/Users/liruiyi/PycharmProjects/python_test/ex36.py", line 8, in run_cmd
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 390, in init
errread, errwrite)
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 1024, in _execute_child
raise child_exception
OSError: [Errno 2] No such file or directory
很好。学习了。~~~
后方可回复, 如果你还没有账号请点击这里 。
jianjianjianbing (煎饼)
第 6493 位会员 /
共收到 12 条回复51CTO旗下网站
使用Python和Asyncio编写在线多人游戏(二)
你在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。
作者:Kyrylo Subbotin来源:| 21:24
在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。
介绍和理论部分参见&第一部分 异步化&。
3、编写游戏循环主体
游戏循环是每一个游戏的核心。它持续地运行以读取玩家的输入、更新游戏的状态,并且在屏幕上渲染游戏结果。在在线游戏中,游戏循环分为客户端和服务端两部分,所以一般有两个循环通过网络通信。通常客户端的角色是获取玩家输入,比如按键或者鼠标移动,将数据传输给服务端,然后接收需要渲染的数据。服务端处理来自玩家的所有数据,更新游戏的状态,执行渲染下一帧的必要计算,然后将结果传回客户端,例如游戏中对象的新位置。如果没有可靠的理由,不混淆客户端和服务端的角色是一件很重要的事。如果你在客户端执行游戏逻辑的计算,很容易就会和其它客户端失去同步,其实你的游戏也可以通过简单地传递客户端的数据来创建。
游戏循环的一次迭代称为一个嘀嗒(tick)。嘀嗒是一个事件,表示当前游戏循环的迭代已经结束,下一帧(或者多帧)的数据已经就绪。
在后面的例子中,我们使用相同的客户端,它使用 WebSocket
从一个网页上连接到服务端。它执行一个简单的循环,将按键码发送给服务端,并显示来自服务端的所有信息。客户端代码戳这里。
例子 3.1:基本游戏循环
我们使用 aiohttp 库来创建游戏服务器。它可以通过 asyncio 创建网页服务器和客户端。这个库的一个优势是它同时支持普通 http 请求和
websocket。所以我们不用其他网页服务器来渲染游戏的 html 页面。
下面是启动服务器的方法:
app&=&web.Application()&app[&sockets&]&=&[]&asyncio.ensure_future(game_loop(app))&app.router.add_route('GET',&'/connect',&wshandler)&app.router.add_route('GET',&'/',&handle)&web.run_app(app)&
web.run_app 是创建服务主任务的快捷方法,通过它的 run_forever() 方法来执行 asyncio
事件循环。建议你查看这个方法的源码,弄清楚服务器到底是如何创建和结束的。
变量就是一个类似于字典的对象,它用于在所连接的客户端之间共享数据。我们使用它来存储连接的套接字的列表。随后会用这个列表来给所有连接的客户端发送消息。asyncio.ensure_future()
调用会启动主游戏循环的任务,每隔2 秒向客户端发送嘀嗒消息。这个任务会在同样的 asyncio 事件循环中和网页服务器并行执行。
有两个网页请求处理器:handle 是提供 html 页面的处理器;wshandler 是主要的 websocket
服务器任务,处理和客户端之间的交互。在事件循环中,每一个连接的客户端都会创建一个新的 wshandler 任务。这个任务会添加客户端的套接字到列表中,以便
game_loop 任务可以给所有的客户端发送消息。然后它将随同消息回显客户端的每个击键。
在启动的任务中,我们在 asyncio 的主事件循环中启动 worker 循环。任务之间的切换发生在它们之间任何一个使用
await语句来等待某个协程结束时。例如 asyncio.sleep 仅仅是将程序执行权交给调度器一段指定的时间;ws.receive 等待 websocket
的消息,此时调度器可能切换到其它任务。
在浏览器中打开主页,连接上服务器后,试试随便按下键。它们的键值会从服务端返回,每隔 2 秒这个数字会被游戏循环中发给所有客户端的嘀嗒消息所覆盖。
我们刚刚创建了一个处理客户端按键的服务器,主游戏循环在后台做一些处理,周期性地同时更新所有的客户端。
例子 3.2: 根据请求启动游戏
在前一个例子中,在服务器的生命周期内,游戏循环一直运行着。但是现实中,如果没有一个人连接服务器,空运行游戏循环通常是不合理的。而且,同一个服务器上可能有不同的&游戏房间&。在这种假设下,每一个玩家&创建&一个游戏会话(比如说,多人游戏中的一个比赛或者大型多人游戏中的副本),这样其他用户可以加入其中。当游戏会话开始时,游戏循环才开始执行。
在这个例子中,我们使用一个全局标记来检测游戏循环是否在执行。当第一个用户发起连接时,启动它。最开始,游戏循环没有执行,标记设置为
False。游戏循环是通过客户端的处理方法启动的。
if&app[&game_is_running&]&==&False:&&&&&&&asyncio.ensure_future(game_loop(app))&
当 game_loop() 运行时,这个标记设置为 T当所有客户端都断开连接时,其又被设置为 False。
例子 3.3:管理任务
这个例子用来解释如何和任务对象协同工作。我们把游戏循环的任务直接存储在游戏循环的全局字典中,代替标记的使用。在像这样的一个简单例子中并不一定是最优的,但是有时候你可能需要控制所有已经启动的任务。
if&app[&game_loop&]&is&None&or&\&&&&app[&game_loop&].cancelled():&&&&&app[&game_loop&]&=&asyncio.ensure_future(game_loop(app))&
这里 ensure_future() 返回我们存放在全局字典中的任务对象,当所有用户都断开连接时,我们使用下面方式取消任务:
app[&game_loop&].cancel()&
这个 cancel() 调用将通知调度器不要向这个协程传递执行权,而且将它的状态设置为已取消:cancelled,之后可以通过 cancelled()
方法来检查是否已取消。这里有一个值得一提的小注意点:当你持有一个任务对象的外部引用时,而这个任务执行中发生了异常,这个异常不会抛出。取而代之的是为这个任务设置一个异常状态,可以通过
exception()
方法来检查是否出现了异常。这种悄无声息地失败在调试时不是很有用。所以,你可能想用抛出所有异常来取代这种做法。你可以对所有未完成的任务显式地调用 result()
来实现。可以通过如下的回调来实现:
app[&game_loop&].add_done_callback(lambda&t:&t.result())&
如果我们打算在我们代码中取消这个任务,但是又不想产生 CancelError 异常,有一个检查 cancelled 状态的点:
app[&game_loop&].add_done_callback(lambda&t:&t.result()&if&not&t.cancelled()&else&None)&
注意仅当你持有任务对象的引用时才需要这么做。在前一个例子,所有的异常都是没有额外的回调,直接抛出所有异常。
例子 3.4:等待多个事件
在许多场景下,在客户端的处理方法中你需要等待多个事件的发生。除了来自客户端的消息,你可能需要等待不同类型事件的发生。比如,如果你的游戏时间有限制,那么你可能需要等一个来自定时器的信号。或者你需要使用管道来等待来自其它进程的消息。亦或者是使用分布式消息系统的网络中其它服务器的信息。
为了简单起见,这个例子是基于例子 3.1。但是这个例子中我们使用 Condition
对象来与已连接的客户端保持游戏循环的同步。我们不保存套接字的全局列表,因为只在该处理方法中使用套接字。当游戏循环停止迭代时,我们使用
Condition.notify_all() 方法来通知所有的客户端。这个方法允许在 asyncio 的事件循环中使用发布/订阅的模式。
为了等待这两个事件,首先我们使用 ensure_future() 来封装任务中这个可等待对象。
if&not&recv_task:&&&&&recv_task&=&asyncio.ensure_future(ws.receive())&if&not&tick_task:&&&&&await&tick.acquire()&&&&&tick_task&=&asyncio.ensure_future(tick.wait())&
在我们调用 Condition.wait() 之前,我们需要在它后面获取一把锁。这就是我们为什么先调用 tick.acquire() 的原因。在调用
tick.wait() 之后,锁会被释放,这样其他的协程也可以使用它。但是当我们收到通知时,会重新获取锁,所以在收到通知后需要调用 tick.release()
来释放它。
我们使用 asyncio.wait() 协程来等待两个任务。
done,&pending&=&await&asyncio.wait(&&&&&&&&&[recv_task,&&&&&&&&&&tick_task],&&&&&&&&&return_when=asyncio.FIRST_COMPLETED)&
程序会阻塞,直到列表中的任意一个任务完成。然后它返回两个列表:执行完成的任务列表和仍然在执行的任务列表。如果任务执行完成了,其对应变量赋值为
None,所以在下一个迭代时,它可能会被再次创建。
例子 3.5: 结合多个线程
在这个例子中,我们结合 asyncio 循环和线程,在一个单独的线程中执行主游戏循环。我之前提到过,由于 GIL 的存在,Python
代码的真正并行执行是不可能的。所以使用其它线程来执行复杂计算并不是一个好主意。然而,在使用 asyncio 时结合线程有原因的:当我们使用的其它库不支持
asyncio 时就需要。在主线程中调用这些库会阻塞循环的执行,所以异步使用他们的唯一方法是在不同的线程中使用他们。
我们使用 asyncio 循环的run_in_executor() 方法和 ThreadPoolExecutor 来执行游戏循环。注意
game_loop() 已经不再是一个协程了。它是一个由其它线程执行的函数。然而我们需要和主线程交互,在游戏事件到来时通知客户端。asyncio
本身不是线程安全的,它提供了可以在其它线程中执行你的代码的方法。普通函数有 call_soon_threadsafe(),协程有
run_coroutine_threadsafe()。我们在 notify()
协程中增加了通知客户端游戏的嘀嗒的代码,然后通过另外一个线程执行主事件循环。
def&game_loop(asyncio_loop):&&&&&print(&Game&loop&thread&id&{}&.format(threading.get_ident()))&&&&&async&def&notify():&&&&&&&&&print(&Notify&thread&id&{}&.format(threading.get_ident()))&&&&&&&&&await&tick.acquire()&&&&&&&&&tick.notify_all()&&&&&&&&&tick.release()&&&&&while&1:&&&&&&&&&task&=&asyncio.run_coroutine_threadsafe(notify(),&asyncio_loop)&&&&&&&&&#&blocking&the&thread&&&&&&&&&sleep(1)&&&&&&&&&#&make&sure&the&task&has&finished&&&&&&&&&task.result()&
当你执行这个例子时,你会看到 &Notify thread id& 和 &Main thread id& 相等,因为 notify()
协程在主线程中执行。与此同时 sleep(1) 在另外一个线程中执行,因此它不会阻塞主事件循环。
例子 3.6:多进程和扩展
单线程的服务器可能运行得很好,但是它只能使用一个 CPU
核。为了将服务扩展到多核,我们需要执行多个进程,每个进程执行各自的事件循环。这样我们需要在进程间交互信息或者共享游戏的数据。而且在一个游戏中经常需要进行复杂的计算,例如路径查找之类。这些任务有时候在一个游戏嘀嗒中没法快速完成。在协程中不推荐进行费时的计算,因为它会阻塞事件的处理。在这种情况下,将这个复杂任务交给其它并行执行的进程可能更合理。
最简单的使用多个核的方法是启动多个使用单核的服务器,就像之前的例子中一样,每个服务器占用不同的端口。你可以使用 supervisord
或者其它进程控制的系统。这个时候你需要一个像 HAProxy 这样的负载均衡器,使得连接的客户端分布在多个进程间。已经有一些可以连接 asyncio
和一些流行的消息及存储系统的适配系统。例如:
aiomcache 用于 memcached 客户端
aiozmq 用于 zeroMQ
aioredis 用于 Redis 存储,支持发布/订阅
你可以在 github 或者 pypi 上找到其它的软件包,大部分以 aio 开头。
使用网络服务在存储持久状态和交换某些信息时可能比较有效。但是如果你需要进行进程间通信的实时处理,它的性能可能不足。此时,使用标准的 unix
管道可能更合适。asyncio 支持管道,在aiohttp仓库有个 使用管道的服务器的非常底层的例子。
在当前的例子中,我们使用 Python 的高层类库 multiprocessing 来在不同的核上启动复杂的计算,使用
multiprocessing.Queue 来进行进程间的消息交互。不幸的是,当前的 multiprocessing 实现与 asyncio
不兼容。所以每一个阻塞方法的调用都会阻塞事件循环。但是此时线程正好可以起到帮助作用,因为如果在不同线程里面执行 multiprocessing
的代码,它就不会阻塞主线程。所有我们需要做的就是把所有进程间的通信放到另外一个线程中去。这个例子会解释如何使用这个方法。和上面的多线程例子非常类似,但是我们从线程中创建的是一个新的进程。
def&game_loop(asyncio_loop):&&&&&#&coroutine&to&run&in&main&thread&&&&&async&def&notify():&&&&&&&&&await&tick.acquire()&&&&&&&&&tick.notify_all()&&&&&&&&&tick.release()&&&&&queue&=&Queue()&&&&&#&function&to&run&in&a&different&process&&&&&def&worker():&&&&&&&&&while&1:&&&&&&&&&&&&&print(&doing&heavy&calculation&in&process&{}&.format(os.getpid()))&&&&&&&&&&&&&sleep(1)&&&&&&&&&&&&&queue.put(&calculation&result&)&&&&&Process(target=worker).start()&&&&&while&1:&&&&&&&&&#&blocks&this&thread&but&not&main&thread&with&event&loop&&&&&&&&&result&=&queue.get()&&&&&&&&&print(&getting&{}&in&process&{}&.format(result,&os.getpid()))&&&&&&&&&task&=&asyncio.run_coroutine_threadsafe(notify(),&asyncio_loop)&&&&&&&&&task.result()&
这里我们在另外一个进程中运行 worker() 函数。它包括一个执行复杂计算并把计算结果放到 queue 中的循环,这个 queue 是
multiprocessing.Queue 的实例。然后我们就可以在另外一个线程的主事件循环中获取结果并通知客户端,就和例子 3.5
一样。这个例子已经非常简化了,它没有合理的结束进程。而且在真实的游戏中,我们可能需要另外一个队列来将数据传递给 worker。
有一个项目叫 aioprocessing,它封装了 multiprocessing,使得它可以和 asyncio
兼容。但是实际上它只是和上面例子使用了完全一样的方法:从线程中创建进程。它并没有给你带来任何方便,除了它使用了简单的接口隐藏了后面的这些技巧。希望在
Python 的下一个版本中,我们能有一个基于协程且支持 asyncio 的 multiprocessing 库。
注意!如果你从主线程或者主进程中创建了一个不同的线程或者子进程来运行另外一个 asyncio 事件循环,你需要显式地使用
asyncio.new_event_loop() 来创建循环,不然的话可能程序不会正常工作。
【编辑推荐】
【责任编辑: TEL:(010)】
大家都在看猜你喜欢
关注头条热点头条热点
24H热文一周话题本月最赞
讲师:738169人学习过
讲师:12584人学习过
讲师:15521人学习过
精选博文论坛热帖下载排行
本书分为8章。第1章主要对XML做了简单的介绍。第2章详细讲解规范的XML文件。第3章主要讲解有效的XML文件,特别重点讲解DTD文件。第4章讲解C...
订阅51CTO邮刊}

我要回帖

更多关于 生活中的不便 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信