需求:
在上节课,我们从Intrinio.com下载多支股票的csv数据,并将其转换为xml文件
额外需求:
实现一个线程TarThread,将转换出的xml文件打包。比如转换线程每生产出100个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件
并删除xml文件,打包完成后,打包线程反过来通知转换线程,转换线程继续转换
思路:
线程间的事件通知,可以使用标准库中的Threading.Event
1、等待事件一端调用wait,等待事件
2、通知事件一端用set,通知事件
代码:
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread
import tarfile
import os
apikey = 'OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk'
class DownloadThread(Thread):
def __init__(self, page_number,queue):
super().__init__()
self.page_number = page_number
self.queue = queue
def run(self):
csv_file = None
while not csv_file:
csv_file = self.download_csv(self.page_number)
self.queue.put((self.page_number,csv_file))
def download_csv(self,page_number):
print('download csv data [page=%s]' % page_number)
url = "https://api.intrinio.com/prices.csv?api_key=OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk&identifier=AAPL&page_size=20&page_number=%s&start_date=2017-09-28&end_date=2020-09-28" % page_number
# auth = b'Basic ' + base64.b64encode(b'%s' % api_key)
# headers = {'Authorization' : auth}
response = requests.get(url)
if response.ok:
return StringIO(response.text)
class ConvertThread(Thread):
def __init__(self,queue,c_event,t_event):
super().__init__()
self.queue = queue
self.c_event = c_event
self.t_event = t_event
def run(self):
count = 0
while True:
page_number,csv_file = self.queue.get()
if page_number == -1:
# 对于最后不足三个的情况,也进行打包并等待打包完成
self.c_event.set()
self.t_event.wait()
break
count += 1
self.csv_to_xml(csv_file,'data%s.xml' % page_number)
if count == 3:
count = 0
# 通知转换完成
self.c_event.set()
# 等待打包完成
self.t_event.wait()
self.t_event.clear()
def csv_to_xml(self,csv_file, xml_path):
print('Convert csv data to %s' % xml_path)
reader = csv.reader(csv_file)
headers = next(reader)
root = Element('Data')
root.text = '\n\t'
root.tail = '\n'
for row in reader:
book = SubElement(root, 'Row')
book.text = '\n\t\t'
book.tail = '\n\t'
for tag, text in zip(headers, row):
e = SubElement(book, tag)
e.text = text
e.tail = '\n\t\t'
e.tail = '\n\t'
ElementTree(root).write(xml_path, encoding='utf8')
class TarThread(Thread):
def __init__(self,c_event,t_event):
super().__init__(daemon=True) # 实现成了一个守护线程,在下载线程和转换线程退出后自动退出。
self.count = 0
self.c_event = c_event
self.t_event = t_event
def run(self):
while True:
# 等待转换完成
self.c_event.wait()
self.c_event.clear() # 要进行清理,为了下次还可以调用event
# 打包
self.tar_xml()
# 通知打包完成
self.t_event.set()
def tar_xml(self):
self.count += 1
tfname = 'data%s.tgz' % self.count
print('tar %s...' % tfname)
tf = tarfile.open(tfname,'w:gz')
for fname in os.listdir('.'):
if fname.endswith('.xml'):
tf.add(fname)
os.remove(fname)
tf.close()
if not tf.members:
os.remove(tfname)
# def download_and_save(page_number, xml_path):
# # IO
# csv_file = None
# while not csv_file:
# csv_file = download_csv(page_number)
# # cpu
# csv_to_xml(csv_file, 'data%s.xml' % page_number)
# class MyThread(Thread):
# def __init__(self, page_number, xml_path):
# super().__init__()
# self.page_number = page_number
# self.xml_path = xml_path
#
# def run(self):
# download_and_save(self.page_number, self.xml_path)
from queue import Queue
from threading import Event
if __name__ == '__main__':
queue = Queue()
c_event = Event()
t_event = Event()
import time
t0 = time.time()
thread_list = []
for i in range(1,15):
t = DownloadThread(i,queue)
t.start()
thread_list.append(t)
convert_thread= ConvertThread(queue,c_event,t_event)
convert_thread.start()
tar_thread = TarThread(c_event,t_event)
tar_thread.start()
for t in thread_list:
t.join()
# for i in range(1, 6):
# download_and_save(i, 'data%s.xml' % i)
print(time.time() - t0)
print('main thread end.')
queue.put((-1,None))
=========================================
>>> from threading import Thread,Event
>>> def f(event):
... print('wait event...')
... event.wait()
... print('f end...')
...
>>> e = Event()
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
>>> e.set()
f end...
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
f end...
>>> e.clear()
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
>>>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。