Skip to content

Commit

Permalink
✨ feat: 通过akshare来获取证券列表
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-yang-biz committed Mar 24, 2024
1 parent aa5be42 commit 3332dc7
Show file tree
Hide file tree
Showing 13 changed files with 1,146 additions and 301 deletions.
87 changes: 87 additions & 0 deletions omega/core/haystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import datetime
from typing import List, Tuple

import cfg4py
import clickhouse_connect
import pandas as pd
from clickhouse_connect.driver.client import Client
from coretypes import Frame, FrameType, SecurityType


class Haystore(object):
def __init__(self):
cfg = cfg4py.get_instance()
host = cfg.clickhouse.host
user = cfg.clickhouse.user
password = cfg.clickhouse.password
database = cfg.clickhouse.database
self.client = clickhouse_connect.get_client(
host=host, username=user, password=password, database=database
)

def close(self):
"""关闭clickhouse连接"""
self.client.close()

def save_bars(self, frame_type: FrameType, bars: pd.DataFrame):
"""保存行情数据。
Args:
frame_type: 行情数据的周期。只接受1分钟和日线
bars: 行情数据,必须包括symbol, frame, OHLC, volume, money字段
"""
assert frame_type in [FrameType.DAY, FrameType.MIN1]
if frame_type == FrameType.DAY:
table = HaystoreTbl.bars_1d
else:
table = HaystoreTbl.bars_1m

self.client.insert_df(table, bars)

def save_ashare_list(
self,
data: pd.DataFrame,
):
"""保存证券(股票、指数)列表
Args:
data: contains date, code, alias, ipo day, and type
"""
self.client.insert_df(HaystoreTbl.securities, data)

def get_bars(
self, code: str, n: int, frame_type: FrameType, end: datetime.datetime
):
"""从clickhouse中获取持久化存储的行情数据
Args:
code: 股票代码,以.SZ/.SH结尾
frame_type: 行情周期。必须为1分钟或者日线
n: 记录数
end: 记录截止日期
"""
sql = "SELECT * from {table: Identifier} where frame < {frame: DateTime} and symbol = {symbol:String}"
params = {"table": f"bars_{frame_type.value}", "frame": end, "symbol": code}
return self.client.query_np(sql, parameters=params)

def query_df(self, sql: str, **params) -> pd.DataFrame:
"""执行任意查询命令"""
return self.client.query_df(sql, parameters=params)

def update_factors(self, sec: str, factors: pd.Series):
"""更新复权因子。
TODO:
参考https://clickhouse.com/blog/handling-updates-and-deletes-in-clickhouse进行优化。
Args:
sec: 待更新复权因子的证券代码
factors: 以日期为索引,复权因子为值的Series
"""

for dt, factor in factors.items():
sql = "alter table bars_day update factor = %(v1)s where symbol = %(v2)s and frame = %(v3)s"

self.client.command(sql, {"v1": factor, "v2": sec, "v3": dt})

Empty file added omega/fetchers/__init__.py
Empty file.
113 changes: 113 additions & 0 deletions omega/fetchers/akshare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import datetime

import akshare as ak
import arrow
import pandas as pd
from coretypes import SecurityInfoSchema
from pandera.typing import DataFrame
from pypinyin import Style, lazy_pinyin


def _get_pinyin_initials(hanz: str)->str:
"""将汉字转换为拼音首字母
本辅助函数用以获取证券名的拼音简写。
"""
return "".join(lazy_pinyin(hanz, style = Style.FIRST_LETTER)).upper()

def fetch_stock_list()->DataFrame[SecurityInfoSchema]:
"""获取证券列表
此接口用时以秒计,因此一般需要在工作者进程中调用。
"""
# 获取沪市股票列表
sh = ak.stock_info_sh_name_code()
sh = sh.rename({
"证券简称": "alias",
"上市日期": "ipo",
"证券代码": "code"
}, axis=1)

sh["code"] = sh["code"].apply(lambda x: x+".SH")
sh["initials"] = sh["alias"].apply(_get_pinyin_initials)
sh["exit"] = None
sh["type"] = "stock"

sh = sh[["code", "alias", "initials", "ipo", "exit", "type"]]

# 补齐退市证券
sh_delisted = ak.stock_info_sh_delist()
sh_delisted = sh_delisted.rename({
"公司代码": "code",
"公司简称": "alias",
"上市日期": "ipo",
"暂停上市日期": "exit"
}, axis=1)

sh_delisted["type"] = "stock"
sh_delisted["code"] = sh_delisted["code"].apply(lambda x: x + ".SH")
sh_delisted["initials"] = sh_delisted["alias"].apply(_get_pinyin_initials)
sh_delisted = sh_delisted[["code", "alias", "initials", "ipo", "exit", "type"]]

# 获取深市股票列表
sz = ak.stock_info_sz_name_code()
sz = sz.rename({
"A股代码": "code",
"A股简称": "alias",
"A股上市日期": "ipo"
}, axis=1)

sz["code"] = sz["code"].apply(lambda x: x+".SZ")
sz["initials"] = sz["alias"].apply(_get_pinyin_initials)
sz["type"] = "stock"

# akshare返回的深市股票,日期列为str
sz["ipo"] = sz["ipo"].apply(lambda x: arrow.get(x).date())

sz["exit"] = None
sz = sz[["code", "alias", "initials", "ipo", "exit", "type"]]

# 补齐深市退市证券
sz_delisted = ak.stock_info_sz_delist("终止上市公司")
sz_delisted = sz_delisted.rename({
"证券代码": "code",
"证券简称": "alias",
"上市日期": "ipo",
"终止上市日期": "exit"
}, axis=1)

sz_delisted["type"] = "stock"
sz_delisted["code"] = sz_delisted["code"].apply(lambda x: x + ".SZ")
sz_delisted["initials"] = sz_delisted["alias"].apply(_get_pinyin_initials)
sz_delisted = sz_delisted[["code", "alias", "initials", "ipo", "exit", "type"]]

# 北交所
bj = ak.stock_info_bj_name_code()
bj = bj.rename({
"证券代码": "code",
"证券简称": "alias",
"上市日期": "ipo"
}, axis=1)

bj["code"] = bj["code"].apply(lambda x: x+".BJ")
bj["initials"] = bj["alias"].apply(_get_pinyin_initials)
bj["type"] = "stock"
bj["exit"] = None

bj = bj[["code", "alias", "initials", "ipo", "exit", "type"]]

# 指数列表
index = ak.index_stock_info()
index = index.rename({
"index_code": "code",
"display_name": "alias",
"publish_date": "ipo"
}, axis=1)

index["code"] = index["code"].apply(lambda x: x+".SH" if x.startswith("000") else x+".SZ")
index["initials"] = index["alias"].apply(_get_pinyin_initials)
index["exit"] = None
index["type"] = "index"
index["ipo"] = index["ipo"].apply(lambda x: arrow.get(x).date())

return pd.concat([sh, sz, sh_delisted, sz_delisted, index], axis=0) #type: ignore
3 changes: 3 additions & 0 deletions omega/fetchers/tushare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import tushare as ts
from coretypes import SecurityInfoSchema
from pandera.typing import DataFrame
15 changes: 13 additions & 2 deletions omega/master/tasks/rebuild_unclosed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ async def _rebuild_min_level_unclosed_bars():
keys = await cache.security.keys("bars:1m:*")

errors = 0
no_first_bars = 0
for key in keys:
try:
sec = key.split(":")[2]
bars = await Stock._get_cached_bars_n(sec, 240, FrameType.MIN1, end)
first_frame = bars[0]["frame"].item()
if no_first_bars > 50:
logger.warning("超过50支个股在缓存中没有09:31的分钟线,zillionare刚安装,还没来得及同步?")
DingTalkMessage.text("重建分钟级缓存数据时,超过50支个股在缓存中没有09:31的分钟线,zillionare刚安装,还没来得及同步?")
return False
if first_frame.hour() != 9 and first_frame.minute() != 31:
no_first_bars += 1
continue
except Exception as e:
logger.exception(e)
logger.warning("failed to get cached bars for %s", sec)
Expand Down Expand Up @@ -49,6 +58,7 @@ async def _rebuild_min_level_unclosed_bars():

if errors > 0:
DingTalkMessage.text(f"重建分钟级缓存数据时,出现{errors}个错误。")
return True


async def _rebuild_day_level_unclosed_bars():
Expand Down Expand Up @@ -101,5 +111,6 @@ async def rebuild_unclosed_bars():
后续未收盘数据的更新,将在每个分钟线同步完成后,调用lua脚本进行。
"""
await _rebuild_min_level_unclosed_bars()
await _rebuild_day_level_unclosed_bars()
success = await _rebuild_min_level_unclosed_bars()
if success:
await _rebuild_day_level_unclosed_bars()
103 changes: 0 additions & 103 deletions omega/worker/quotes_fetcher.py

This file was deleted.

6 changes: 4 additions & 2 deletions omega/worker/tasks/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,17 @@ async def get_secs_for_sync(limit: int, n_bars: int, name: str):
Returns:
"""
while True:
step = limit // n_bars # 根据 单次api允许获取的条数 和一共多少根k线 计算每次最多可以获取多少个股票的
step = limit // n_bars # 根据单次api允许获取的条数 和一共多少根k线 计算每次最多可以获取多少个股票的
p = cache.sys.pipeline()
p.lrange(name, 0, step - 1)
p.ltrim(name, step, -1)
secs, _ = await p.execute()
if not len(secs):
if len(secs) == 0:
break
logger.info("get %s secs for sync %s", len(secs), name)
yield secs


Expand Down
Loading

0 comments on commit 3332dc7

Please sign in to comment.