Python内置了读写文件的函数,用法和C是兼容的。
>>> f = open('/Users/michael/test.txt', 'r')
标示符’r’表示读,这样,我们就成功地打开了一个文件。如果文件不存在,open()函数就会抛出一个IOError的错误.
>>> f.read()
'Hello, world!'
>>> f.close()
with open('/path/to/file', 'r') as f:
print(f.read())
调用
read()会一次性读取文件的全部内容
调用read(size)方法,每次最多读取size个字节的内容
调用readline()可以每次读取一行内容
调用readlines()一次读取所有内容并按行返回list
要读取二进制文件,比如图片、视频等等,用’rb’模式打开文件
>>> f = open('/Users/michael/test.jpg', 'rb')
>>> f.read()
b'\xff\xd8\xff\xe1\x00\x18Exif\x00\x00...' # 十六进制表示的字节
读取非UTF-8编码的文本文件,需要给open()函数传入encoding参数
>>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk')
>>> f.read()
'测试'
遇到编码错误忽略
>>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore')
写文件和读文件是一样的,唯一区别是调用open()函数时,传入标识符’w’或者’wb’表示写文本文件或写二进制文件
with open('/Users/michael/test.txt', 'w') as f:
f.write('Hello, world!')
f.close()
>>> from io import StringIO
>>> f = StringIO()
>>> f.write('hello')
5
>>> f.write(' ')
1
>>> f.write('world!')
6
>>> print(f.getvalue())
hello world!
>>> from io import BytesIO
>>> f = BytesIO()
>>> f.write('中文'.encode('utf-8'))
6
>>> print(f.getvalue())
b'\xe4\xb8\xad\xe6\x96\x87'
>>> import os
>>> os.name # 操作系统类型
'posix'
如果是posix,说明系统是Linux、Unix或Mac OS X,如果是nt,就是Windows系统。
在操作系统中定义的环境变量,全部保存在os.environ这个变量中
# 查看当前目录的绝对路径:
>>> os.path.abspath('.')
'/Users/michael'
# 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来:
>>> os.path.join('/Users/michael', 'testdir')
'/Users/michael/testdir'
# 然后创建一个目录:
>>> os.mkdir('/Users/michael/testdir')
# 删掉一个目录:
>>> os.rmdir('/Users/michael/testdir')
# 对文件重命名:
>>> os.rename('test.txt', 'test.py')
# 删掉文件:
>>> os.remove('test.py')
把两个路径合成一个时,不要直接拼字符串,而要通过os.path.join()函数
part-1/part-2
要拆分路径时,也不要直接去拆字符串,而要通过os.path.split()函数
>>> os.path.splitext('/path/to/file.txt')
('/path/to/file', '.txt')
Python提供了pickle模块来实现序列化。
>>> import pickle
>>> d = dict(name='Bob', age=20, score=88)
>>> pickle.dumps(d)
b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.'
pickle.dump()直接把对象序列化后写入一个file-like Object:
>>> f = open('dump.txt', 'wb')
>>> pickle.dump(d, f)
>>> f.close()
>>> f = open('dump.txt', 'rb')
>>> d = pickle.load(f)
>>> f.close()
>>> d
{'age': 20, 'score': 88, 'name': 'Bob'}
| JSON类型 | Python类型 |
|---|---|
| {} | dict |
| [] | list |
| “string” | str |
| 1234.56 | int或float |
| true/false | True/False |
| null | None |
>>> import json
>>> d = dict(name='Bob', age=20, score=88)
>>> json.dumps(d)
'{"age": 20, "score": 88, "name": "Bob"}'
>>> json_str = '{"age": 20, "score": 88, "name": "Bob"}'
>>> json.loads(json_str)
{'age': 20, 'score': 88, 'name': 'Bob'}
def student2dict(std):
return {
'name': std.name,
'age': std.age,
'score': std.score
}
import json
class Student(object):
def __init__(self, name, age, score):
self.name = name
self.age = age
self.score = score
s = Student('Bob', 20, 88)
print(json.dumps(s, default=student2dict))
{"age": 20, "name": "Bob", "score": 88}
协程,又称微线程,纤程。英文名Coroutine。协程的特点在于是一个线程执行
多线程比协程有何优势?
Python对协程的支持是通过generator实现的。
协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
结果
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函数是一个generator,把一个consumer传入produce后:
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
用asyncio的异步网络连接来获取sina、sohu和163的网站首页:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
结果
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段时间)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
把@asyncio.coroutine替换为async;
把yield from替换为await。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
安装aiohttp
pip3 install aiohttp
然后编写一个HTTP服务器,分别处理以下URL:
代码如下:
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'Index
')
async def hello(request):
await asyncio.sleep(0.5)
text = 'hello, %s!
' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
注意aiohttp的初始化函数init()也是一个coroutine,loop.create_server()则利用asyncio创建TCP服务。