python实现多线程行情抓取工具的方法

2019-10-10| 发布者: admin| 查看: |


1,股票列表信息接口



code,代码
name,名称
industry,所属行业
area,地区
pe,市盈率
outstanding,流通股本
totals,总股本
totalAssets,总资产
liquidAssets,流动资产
fixedAssets,固定资产
reserved,公积金
reservedPerShare,每股公积金
esp,每股收益
bvps,每股净资
pb,市净率
timeToMarket,上市日期
undp,未分利润
perundp, 每股未分配
rev,收入同比
profit,利润同比
gpr,毛利率
npr,净利润率
holders,股东人数

import tushare as ts
ts.get_stock_basics

 name industry area pe outstanding totals totalAssets
600606 金丰投资 房产服务 上海 0.00 51832.01 51832.01 744930.44
002285 世联行 房产服务 深圳 71.04 76352.17 76377.60 411595.28
000861 海印股份 房产服务 广东 126.20 83775.50 118413.84 730716.56
000526 银润投资 房产服务 福建 2421.16 9619.50 9619.50 20065.32
000056 深国商 房产服务 深圳 0.00 14305.55 26508.14 787195.94
600895 张江高科 园区开发 上海 171.60 154868.95 154868.95 1771040.38
600736 苏州高新 园区开发 江苏 48.68 105788.15 105788.15 2125485.75
600663 陆家嘴 园区开发 上海 47.63 135808.41 186768.41 4562074.50
600658 电子城 园区开发 北京 19.39 58009.73 58009.73 431300.19
600648 外高桥 园区开发 上海 65.36 81022.34 113534.90 2508100.75
600639 浦东金桥 园区开发 上海 57.28 65664.88 92882.50 1241577.00
600604 市北高新 园区开发 上海 692.87 33352.42 56644.92 329289.50

2,日复权行情接口


提供股票上市以来所有历史数据,默认为前复权,读取后存到本地,作为后续分析的基础


ts.get_h_data #两个日期之间的前复权数据
parameter:
code:string,股票代码 e.g. 600848
start:string,开始日期 format:YYYY-MM-DD 为空时取当前日期
end:string,结束日期 format:YYYY-MM-DD 为空时取去年今日
autype:string,复权类型,qfq-前复权 hfq-后复权 None-不复权,默认为qfq
index:Boolean,是否是大盘指数,默认为False
retry_count : int, 默认3,如遇网络等问题重复执行的次数
pause : int, 默认 0,重复请求数据过程中暂停的秒数,防止请求间隔时间太短出现的问题
return:
date : 交易日期 
open : 开盘价
high : 最高价
close : 收盘价
low : 最低价
volume : 成交量
amount : 成交金额

 open high close low volume amount
2015-03-16 13.27 13.45 13.39 13.00 81212976 1073862784
2015-03-13 13.04 13.38 13.37 13.00 40548836 532739744
2015-03-12 13.29 13.95 13.28 12.96 71505720 962979904
2015-03-11 13.35 13.48 13.15 13.00 59110248 780300736
2015-03-10 13.16 13.67 13.59 12.72 105753088 1393819776
2015-03-09 13.77 14.73 14.13 13.70 139091552 1994454656
2015-03-06 12.17 13.39 13.39 12.17 89486704 1167752960
2015-03-05 12.79 12.80 12.17 12.08 26040832 966927360
2015-03-04 13.96 13.96 13.30 12.58 26636174 1060270720
2015-03-03 12.17 13.10 13.10 12.05 19290366 733336768

废话不多说,直接上代码,


class ThreadRead:
 def __init__:
 用于根据股票代码、需要读取的日期,读取增量的日行情数据,
 :param k8.com凯发真人queue:用于保存需要读取的股票代码、起始日期的列表
 :param out_queue:用于保存需要写入到数据库表的结果集列表
 :return:
 threading.Thread.__init__
 self.queue = queue
 self.out_queue = out_queue
 def run:
 while true:
 item = self.queue.get
 time.sleep
 try:
 df_h_data = ts.get_h_data
 if df_h_data is not None and len 0:
 df_h_data['secucode'] = item['code']
 df_h_data.index.name = 'date'
 print df_h_data.index,item['code'],item['startdate']
 df_h_data['tradeday'] = df_h_data.index.strftime
 self.out_queue.put
 except Exception, e:
 print str
 self.queue.put # 将没有爬取成功的数据放回队列里面去,以便下次重试。
 time.sleep
 continue
 self.queue.task_done

消费者线程,本地存储

class ThreadWrite:
 def __init__:
 :param queue: 某种形式的任务队列,此处为tushare为每个股票返回的最新日复权行情数据
 :param lock: 暂时用连接互斥操作,防止mysql高并发,后续可尝试去掉
 :param db_engine: mysql数据库的连接对象
 :return:no
 threading.Thread.__init__
 self.queue = queue
 self.lock = lock
 self.db_engine = db_engine
 def run:
 while True:
 item = self.queue.get
 self._save_data
 self.queue.task_done
 def _save_data:
 with self.lock:
 try:
 item.to_sql
 except Exception, e: # 如果是新股,则有可能df_h_data是空对象,因此需要跳过此类情况不处理
 print str

from Queue import Queue
stock_queue = Queue
data_queue = Queue
lock = threading.Lock
def main:
 用于测试多线程读取数据
 :return:
 #获取环境变量,取得相应的环境配置,上线时不需要再变更代码
 global stock_queue
 global data_queue
 config=os.getenv
 if config == 'default':
 db_url='mysql+pymysql://root:******@localhost:3306/python charset=utf8mb4'
 else:
 db_url='mysql+pymysql://root:******@localhost:3306/test charset=utf8mb4'
 db_engine = create_engine
 conn = db_engine.connect
 #TODO 增加ts.get_stock_basics报错的处理,如果取不到信息则直接用数据库中的股票代码信息,来获取增量信息
 #TODO 增加一个标志,如果一个股票代码的最新日期不是最新日期,则需标记该代码不需要重新获取数据,即记录该股票更新日期到了最新工作日,
 df = ts.get_stock_basics
 df.to_sql})
 # 计算距离当前日期最大的工作日,以便每日定时更新
 today=time.strftime))
 s1= from cron_tradeday t where flag=1 and t.date ='"+ today+"'")
 selectsql=text
 maxTradeay = conn.execute.first
 # 计算每只股票当前加载的最大工作日期,支持重跑
 s =  from cron_dailyquote t group by secucode ")
 selectsql = text
 result = conn.execute # 执行查询语句
 df_result = pd.DataFrame)
 df_result.columns=['stockcode','max_tradeday']
 df_result.set_index
 # 开始归档前复权历史行情至数据库当中,以便可以方便地计算后续选股模型
 for i in range:#使用3个线程
 t = ThreadRead
 t.setDaemon
 t.start
 for code in set):
 try:
 #如果当前股票已经是最新的行情数据,则直接跳过,方便重跑。
 #print maxTradeay[0],df_result.loc[code].values[1]
 if df_result.loc[code].values[1] == maxTradeay[0]:
 continue
 startdate=getLastNdate
 except Exception, e:
 #如果某只股票没有相关的行情,则默认开始日期为2015年1月1日
 startdate='2015-01-01'
 item={}
 item['code']=code
 item['startdate']=startdate
 stock_queue.put # 生成生产者任务队列
 for i in range:
 t = ThreadWrite
 t.setDaemon
 t.start
 stock_queue.join
 data_queue.join

原本需要2,3个小时才能执行完成的每日复权行情增量落地,有效缩短至了1小时以内,这里线程数并不上越多越好,由于复权行情读的是接口,在高并发情况下会返回HTTP 503服务器过载的错误,另外高并发下可能需要使用IP代理池,下载的时段也需要尝试多个时段进行。初次尝试,如果有更好的方法或者哪里有考虑不周的地方欢迎留言建议或者指正。

总结