1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| # -*- coding: utf-8 -*-
# 导入相关的库和模块 from iFinDPy import * from datetime import datetime import pandas as pd import time as _time import json from threading import Thread, Lock, Semaphore import requests
# 创建一个信号量,用于控制最大并发数为5 sem = Semaphore(5) # 创建一个锁,用于控制实时行情推送中数据写入到本地的过程 dllock = Lock()
# 登录函数 # 定义登录函数 def thslogindemo(): # 使用给定的用户名和密码进行登录 # thsLogin = THS_iFinDLogin('账号', '密码') print(thsLogin) # 根据登录结果,判断是否登录成功 if thsLogin != 0: print('登录失败') else: print('登录成功')
# 定义函数,提取沪深300的全部股票在2020-11-16日的日不复权收盘价 def datepool_basicdata_demo(): # 通过数据池的板块成分函数和基础数据函数,提取沪深300的全部股票在2020-11-16日的日不复权收盘价 data_hs300 = THS_DP('block', '2020-11-16;001005290', 'date:Y,thscode:Y,security_name:Y') if data_hs300.errorcode != 0: print('error:{}'.format(data_hs300.errmsg)) else: seccode_hs300_list = data_hs300.data['THSCODE'].tolist() data_result = THS_BD(seccode_hs300_list, 'ths_close_price_stock', '2020-11-16,100') if data_result.errorcode != 0: print('error:{}'.format(data_result.errmsg)) else: data_df = data_result.data print(data_df)
# 定义函数,提取上证50的全部股票的最新价数据,并将其导出为csv文件 def datapool_realtime_demo(): # 通过数据池的板块成分函数和实时行情函数,提取上证50的全部股票的最新价数据,并将其导出为csv文件 today_str = datetime.today().strftime('%Y-%m-%d') print('today:{}'.format(today_str)) data_sz50 = THS_DP('block', '{};001005260'.format(today_str), 'date:Y,thscode:Y,security_name:Y') if data_sz50.errorcode != 0: print('error:{}'.format(data_sz50.errmsg)) else: seccode_sz50_list = data_sz50.data['THSCODE'].tolist() data_result = THS_RQ(seccode_sz50_list, 'latest') if data_result.errorcode != 0: print('error:{}'.format(data_result.errmsg)) else: data_df = data_result.data print(data_df) data_df.to_csv('realtimedata_{}.csv'.format(today_str))
# 定义函数,演示如何通过不消耗流量的自然语言语句调用常用数据 def iwencai_demo(): print('输出资金流向数据') data_wencai_zjlx = THS_WC('主力资金流向', 'stock') if data_wencai_zjlx.errorcode != 0: print('error:{}'.format(data_wencai_zjlx.errmsg)) else: print(data_wencai_zjlx.data)
print('输出股性评分数据') data_wencai_xny = THS_WC('股性评分', 'stock') if data_wencai_xny.errorcode != 0: print('error:{}'.format(data_wencai_xny.errmsg)) else: print(data_wencai_xny.data)
# 定义函数,处理实时行情推送数据 def dlwork(tick_data): # 本函数为实时行情订阅新启线程的任务函数 dllock.acquire() with open('dlwork.txt', 'a') as f: for stock_data in tick_data['tables']: if 'time' in stock_data: timestr = _time.strftime('%Y-%m-%d %H:%M:%S', _time.localtime(stock_data['time'][0])) print(timestr) f.write(timestr + str(stock_data) + '\n') else: pass dllock.release()
# 定义函数,提取数据并写入到文件 def work(codestr, lock, indilist): sem.acquire() stockdata = THS_HF(codestr, ';'.join(indilist), '', '2020-08-11 09:15:00', '2020-08-11 15:30:00', 'format:json') if stockdata.errorcode != 0: print('error:{}'.format(stockdata.errmsg)) sem.release() else: print(stockdata.data) lock.acquire() with open('test1.txt', 'a') as f: f.write(str(stockdata.data) + '\n') lock.release() sem.release()
# 定义函数,演示如何使用多线程加速数据提取 def multiThread_demo(): # 本函数为通过高频序列函数,演示如何使用多线程加速数据提取的示例,本例中通过将所有A股分100组,最大线程数量sem进行提取 # 用户可以根据自身场景进行修改 today_str = datetime.today().strftime('%Y-%m-%d') print('today:{}'.format(today_str)) data_alla = THS_DP('block', '{};001005010'.format(today_str), 'date:Y,thscode:Y,security_name:Y') if data_alla.errorcode != 0: print('error:{}'.format(data_alla.errmsg)) else: stock_list = data_alla.data['THSCODE'].tolist()
indi_list = ['close', 'high', 'low', 'volume'] lock = Lock()
btime = datetime.now() l = [] for eachlist in [stock_list[i:i + int(len(stock_list) / 10)] for i in range(0, len(stock_list), int(len(stock_list) / 10))]: nowstr = ','.join(eachlist) p = Thread(target=work, args=(nowstr, lock, indi_list)) l.append(p)
for p in l: p.start() for p in l: p.join() etime = datetime.now() print(etime - btime)
# 设置pandas的显示选项 pd.options.display.width = 320 pd.options.display.max_columns = None
# 定义函数,下载满足条件的公告的pdf def reportDownload(): df = THS_ReportQuery('300033.SZ', 'beginrDate:2021-08-01;endrDate:2021-08-31;reportType:901', 'reportDate:Y,thscode:Y,secName:Y,ctime:Y,reportTitle:Y,pdfURL:Y,seq:Y').data print(df) for i in range(len(df)): pdfName = df.iloc[i, 4] + str(df.iloc[i, 6]) + '.pdf' pdfURL = df.iloc[i, 5] r = requests.get(pdfURL) with open(pdfName, 'wb+') as f: f.write(r.content)
def main(): # 本脚本为数据接口通用场景的实例,可以通过取消注释下列示例函数来观察效果
# 登录函数 thslogindemo() # 通过数据池的板块成分函数和基础数据函数,提取沪深300的全部股票在2020-11-16日的日不复权收盘价 datepool_basicdata_demo() # 通过数据池的板块成分函数和实时行情函数,提取上证50的全部股票的最新价数据,并将其导出为csv文件 # datapool_realtime_demo() # 演示如何通过不消耗流量的自然语言语句调用常用数据 # iwencai_demo() # 本函数为通过高频序列函数,演示如何使用多线程加速数据提取的示例,本例中通过将所有A股分100组,最大线程数量sem进行提取 # multiThread_demo() # 本函数演示如何使用公告函数提取满足条件的公告,并下载其pdf # reportDownload()
if __name__ == '__main__': main()
|