• 多线程增量下载K线数据


    准备一份股票列表的CSV文件,文件格式如下

    codenameclosecmvdate_ipo
    300434金石亚药12.89427982959020150424
    300380安硕信息19.31241993416320140128
    688123聚辰股份132.821114087266620191223
    300586美联新材20.34790882138120170104
    300534陇神戎发12.96389465063120160913
    300813泰林生物60.55160529663520200114
    688259创耀科技88.76157821235820220112
    301211亨迪药业30.43177175385520211222
    688099晶晨股份75.53121409395420190808

    范例CSV下载

    程序所在目录下,手工建好3个目录
    data/day
    data/week
    data/month

    导入必要的包

    1. import akshare as ak
    2. import numpy as np
    3. import pandas as pd
    4. import warnings
    5. import time
    6. import datetime as dt
    7. warnings.filterwarnings("ignore")

    读取股票列表

    1. #需要下载的股票列表
    2. #将 yyyymmdd形式的日期字符串,转换为yyyy-mm-dd形式
    3. def f_str2dtstr(x):
    4. try:
    5. return dt.datetime.strptime(x,'%Y%m%d').strftime('%Y-%m-%d')
    6. except:
    7. return np.NaN
    8. df = pd.read_csv('wz.csv',dtype={'code':str,'name':str,'close':float,'cmv':float,'date_ipo':str})
    9. #print(df)
    10. df['date_ipo'] = df['date_ipo'].apply(lambda x: f_str2dtstr(x))
    11. df

     设置线程工作相关参数

    1. import threading
    2. job_per_thread = 15
    3. remainder = len(df) % job_per_thread
    4. count_thread = 0
    5. if remainder == 0:
    6. count_thread = int(len(df)/job_per_thread)
    7. else:
    8. count_thread = int((len(df)-job_per_thread)/job_per_thread) +1
    9. print(job_per_thread) # 每个线程处理多少个股票数据
    10. print(count_thread) # 余数,最后一个线程处理多少个股票数据
    11. print(remainder) # 线程数量

    线程处理函数

    • 参数为线程编号(第X号线程,根据循环的变量来的)
    • 网络读取数据,纠错5次
    • 日线,月线,周线全部下载
    • 增量下载、任意时间下载(对比本地数据,删除本地第一条数据,以新下载的为准)
    1. def proc_get(m):
    2. if remainder == 0:
    3. count_job = job_per_thread
    4. else:
    5. if m == (count_thread-1):
    6. count_job = remainder
    7. else:
    8. count_job = job_per_thread
    9. for i in range(0,count_job):
    10. code = df.iloc[m*job_per_thread+i,0]
    11. date_s = '1970-01-01'
    12. d = pd.DataFrame(data=None,columns=['date','open','high','low','close','volume','turnover'])
    13. p_s = ['weekly','daily', 'monthly']
    14. for k in range(0,len(p_s)):
    15. item_p = p_s[k]
    16. path_file = "./data/"
    17. if item_p == 'daily':
    18. path_file = path_file+"day/"
    19. elif item_p == 'weekly':
    20. path_file = path_file+"week/"
    21. elif item_p == 'monthly':
    22. path_file = path_file+"month/"
    23. else:
    24. break
    25. path_file = path_file+"%s.csv"%(code)
    26. try:
    27. d = pd.read_csv("./data/day/%s.csv"%(code),dtype={'code':str,'code':str,'date':str})
    28. except:
    29. pass
    30. if len(d) > 0:
    31. #print(d.iloc[-1,0])
    32. date_s = d.iloc[-1,0]
    33. date_e = dt.datetime.now().strftime('%Y%m%d')
    34. if dt.datetime.strptime(date_s,'%Y-%m-%d') < dt.datetime.strptime(date_e,'%Y%m%d') :
    35. for j in range(0,5):
    36. try:
    37. a = ak.stock_zh_a_hist(symbol=code,period=item_p,start_date=date_s,end_date=date_e,adjust="qfq")
    38. a = a[['日期','开盘','最高','最低','收盘','成交量','换手率']]
    39. a.columns=['date','open','high','low','close','volume','turnover']
    40. #print(a)
    41. if len(d) > 0:
    42. d = pd.concat([d.iloc[0:-2,:],a],axis=0,ignore_index=True)
    43. else:
    44. d = a
    45. break
    46. except:
    47. print(code)
    48. time.sleep(3)
    49. if len(d) > 0:
    50. d.to_csv(path_file,index=False,date_format="%Y-%m-%d")

    开启多线程 

    1. threads = []
    2. for m in range(0,count_thread):
    3. t = threading.Thread(target=proc_get,args=(m,)) #注意即使一个参数,参数也要有,结尾
    4. #print(m)
    5. #print(t)
    6. threads.append(t)
    7. t.start()
    8. for t in threads:
    9. t.join()

    说明: 

    网络下载数据失败的代码会被打印出来
    一般情况下超过5次的就是真的没数据 
    没数据的一般是一些生成了股票代码但没交易的新股
    得空就更新一下你的K线数据呗,增量挺快的
    数据全部取得是前复权的
    未复权,包含前收盘价的数据可以换一个源取,有兴趣的可以做一下。
    任何疑问 turui@163.net
    当前接口获取的数据,起码比巨宽的强
    以SH000300数据为例,巨宽的数据从2005.04.08才有
    看到这种形状,赶紧入吧,涨停板等着你
    数据

  • 相关阅读:
    数学建模学习(88):飞蛾扑火算法(WFO)寻优
    原型和原型链
    HCL模拟器选路实验案例
    stm32之PWM呼吸灯
    Redis:Redis的数据结构、key的操作命令
    应用性能监测工具(APM)VS数据可观测平台
    数据脱敏的风险量化评估介绍
    大数据系列教程之 Kafka基础
    set维护连续段+线段树:1018T2
    黑马程序员Java数据结构与java算法笔记(1)
  • 原文地址:https://blog.csdn.net/turui/article/details/127892837