一、概述
1.1 为什么需要时间校准
在量化投资与金融数据分析领域,数据来源的多样性带来了一个基础但至关重要的问题——时间格式不统一。不同的数据供应商(Tushare、AKShare、Wind、Bloomberg、Yahoo Finance 等)采用不同的时间表示方式,有的使用 Unix 时间戳,有的使用 ISO 8601 字符串,还有的使用交易所特有的紧凑格式。如果不对这些时间进行统一校准,将直接导致以下严重后果:
- 数据合并失败:从不同 API 获取的同一只股票的日线数据,因时间字段格式不同而无法直接进行
merge或join操作。 - 时间排序错乱:混合格式的时间字段在排序时可能产生错误顺序,导致技术指标计算偏差。
- 回测结果失真:在策略回测中,时间对齐错误会导致未来数据泄露(look-ahead bias)或信号延迟。
- 跨市场分析失效:A股(北京时间 CST)、美股(美东时间 EST/EDT)、港股(香港时间 HKT)处于不同时区,缺乏统一时间轴将导致跨市场相关性分析出现系统性偏差。
因此,股市时间校准是所有金融数据处理流程中不可或缺的第一步,其核心目标是将来自不同数据源、不同时区、不同粒度的时间信息,统一映射到一条标准化的时间轴上。
1.2 适用场景
本 Skill 适用于以下典型场景:
| 场景 | 说明 |
|---|---|
| 跨 API 数据整合 | 从多个数据源获取同一标的的数据并合并 |
| 跨市场分析 | 同时分析 A 股、港股、美股等不同市场的数据 |
| 多周期策略 | 在日线、分钟线、Tick 级数据之间切换分析 |
| 历史回测 | 将不同格式的历史数据统一后进行回测 |
| 实时行情处理 | 将实时推送的 Tick 数据标准化后入库 |
| 监控告警 | 基于统一时间轴进行异常检测与告警 |
1.3 支持的时间粒度
| 粒度 | 精度 | 典型用途 |
|---|---|---|
| 日线(Daily) | 天 | 中长期趋势分析、因子研究 |
| 分钟线(Minute) | 分钟 | 日内策略、量价分析 |
| Tick 级 | 毫秒/微秒 | 高频交易、Order Book 重建、微观结构研究 |
不同粒度的时间校准面临不同的挑战:日线数据主要处理日期格式差异和节假日对齐;分钟线数据需要关注交易时段划分(如 A 股的 09:30-11:30、13:00-15:00);Tick 级数据则需处理毫秒精度的时间戳和交易所特有的编码格式。
二、常见时间格式大全
在金融数据工程中, encountered 的时间格式多达数十种。以下逐一列出最常见的 16 种格式,并给出具体示例与解析方法。
2.1 Unix 时间戳(秒级)
格式说明:自 1970-01-01 00:00:00 UTC 起经过的秒数,32 位整数范围可表示至 2038 年。
示例:1700000000
对应时间:2023-11-14 22:13:20 UTC / 2023-11-15 06:13:20 CST
解析方法:
from datetime import datetime, timezone
dt = datetime.fromtimestamp(1700000000, tz=timezone.utc)
# datetime(2023, 11, 14, 22, 13, 20, tzinfo=timezone.utc)
常见来源:部分 REST API 的默认返回格式、数据库存储格式。
2.2 Unix 时间戳(毫秒级)
格式说明:秒级时间戳乘以 1000,13 位整数,JavaScript 默认时间格式。
示例:1700000000000
对应时间:同上
解析方法:
dt = datetime.fromtimestamp(1700000000000 / 1000, tz=timezone.utc)
常见来源:JavaScript 前端系统、部分量化平台(如聚宽的内部时间表示)。
2.3 Unix 时间戳(微秒级)
格式说明:秒级时间戳乘以 1,000,000,16 位整数,Python time.time() 的原始精度。
示例:1700000000000000
对应时间:同上
解析方法:
dt = datetime.fromtimestamp(1700000000000000 / 1_000_000, tz=timezone.utc)
常见来源:Python 原生时间函数、高精度日志系统、部分交易所 Level-2 行情接口。
2.4 ISO 8601 基本格式
格式说明:ISO 8601 标准的紧凑表示,去掉所有分隔符,以 T 分隔日期和时间,以 Z 表示 UTC。
示例:20240101T080000Z
对应时间:2024-01-01 08:00:00 UTC
解析方法:
dt = datetime.strptime("20240101T080000Z", "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc)
常见来源:部分国际 API(如 Alpha Vantage 的部分接口)、日志文件。
2.5 ISO 8601 扩展格式
格式说明:ISO 8601 的可读表示,使用 - 和 : 作为分隔符。
示例:2024-01-01T08:00:00Z
对应时间:2024-01-01 08:00:00 UTC
解析方法:
dt = datetime.fromisoformat("2024-01-01T08:00:00Z")
# Python 3.11+ 直接支持;低版本需替换 Z 为 +00:00
常见来源:大多数现代 API 的标准输出格式、JSON 数据交换。
2.6 带时区偏移的 ISO 8601
格式说明:在扩展格式基础上附加时区偏移量,+08:00 表示东八区。
示例:2024-01-01T08:00:00+08:00
对应时间:2024-01-01 08:00:00 北京时间 = 2024-01-01 00:00:00 UTC
解析方法:
dt = datetime.fromisoformat("2024-01-01T08:00:00+08:00")
常见来源:国际化金融数据平台、Bloomberg API、FIX 协议消息。
2.7 RFC 3339
格式说明:RFC 3339 是 ISO 8601 的一个严格子集,广泛用于互联网协议(如 HTTP Date、Atom Feed)。
示例:2024-01-01T08:00:00+08:00
对应时间:同 2.6
解析方法:
# RFC 3339 与带偏移的 ISO 8601 解析方式相同
dt = datetime.fromisoformat("2024-01-01T08:00:00+08:00")
常见来源:Web API 响应、OAuth 令牌时间戳、Atom/RSS Feed。
2.8 美式日期格式
格式说明:MM/DD/YYYY HH:MM:SS,月在前日在后,是美国最常用的日期书写习惯。
示例:01/15/2024 09:30:00
对应时间:2024年1月15日 09:30:00(时区需根据上下文确定)
解析方法:
dt = datetime.strptime("01/15/2024 09:30:00", "%m/%d/%Y %H:%M:%S")
常见来源:美国本土数据供应商、NYSE/NASDAQ 官方数据、部分 CSV 导出文件。
注意:此格式与中式日期格式(YYYY/MM/DD)容易混淆,解析时必须明确来源。
2.9 中式日期格式
格式说明:使用中文数字和"年"“月”"日"字符。
示例:2024年01月15日 09:30:00
对应时间:2024年1月15日 09:30:00
解析方法:
dt = datetime.strptime("2024年01月15日 09:30:00", "%Y年%m月%d日 %H:%M:%S")
常见来源:中文研报、财经新闻、部分国内数据平台的可视化输出。
2.10 东方财富/同花顺格式
格式说明:纯数字紧凑格式 YYYYMMDD,无分隔符。
示例:20240115
对应时间:2024年1月15日
解析方法:
dt = datetime.strptime("20240115", "%Y%m%d")
常见来源:东方财富 Choice 数据、同花顺 iFinD、大部分国内财经网站的数据导出。
2.11 Tushare 格式
格式说明:与东方财富格式相同,使用 YYYYMMDD 字符串。
示例:20240115
对应时间:2024年1月15日
解析方法:
dt = datetime.strptime("20240115", "%Y%m%d")
常见来源:Tushare Pro API 的 trade_date 字段。
2.12 Yahoo Finance 格式
格式说明:ISO 风格的日期字符串 YYYY-MM-DD。
示例:2024-01-15
对应时间:2024年1月15日
解析方法:
dt = datetime.strptime("2024-01-15", "%Y-%m-%d")
# 或
dt = datetime.fromisoformat("2024-01-15")
常见来源:Yahoo Finance API(yfinance 库)、Google Finance、大部分国际数据平台。
2.13 聚宽格式
格式说明:YYYY-MM-DD HH:MM:SS,标准 datetime 字符串。
示例:2024-01-01 09:30:00
对应时间:2024年1月1日 09:30:00(北京时间)
解析方法:
dt = datetime.strptime("2024-01-01 09:30:00", "%Y-%m-%d %H:%M:%S")
常见来源:聚宽(JoinQuant)研究平台的默认时间格式。
2.14 Wind 格式
格式说明:带毫秒精度的 datetime 字符串。
示例:2024-01-15 09:30:00.000
对应时间:2024年1月15日 09:30:00.000
解析方法:
dt = datetime.strptime("2024-01-15 09:30:00.000", "%Y-%m-%d %H:%M:%S.%f")
常见来源:Wind 金融终端 API(WSD/WSQ 接口)的返回数据。
2.15 Bloomberg 格式
格式说明:带毫秒精度和时区偏移的 ISO 8601 扩展格式。
示例:2024-01-15T09:30:00.000+08:00
对应时间:2024年1月15日 09:30:00 北京时间
解析方法:
dt = datetime.fromisoformat("2024-01-15T09:30:00.000+08:00")
常见来源:Bloomberg Terminal API(BBG)、Bloomberg Data License、FIX 协议。
2.16 交易所原始 Tick 时间格式(上交所/深交所)
格式说明:交易所 Level-2 行情使用的高度紧凑格式,精确到毫秒,共 17 位数字:YYYYMMDDHHmmssSSS。
示例:20240115093000000
对应时间:2024年1月15日 09:30:00.000
解析方法:
raw = "20240115093000000"
dt = datetime.strptime(raw[:14], "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(raw[14:17]) * 1000)
# datetime(2024, 1, 15, 9, 30, 0, 0)
常见来源:上交所/深交所 Level-2 行情数据、券商极速行情柜台。
三、时区处理
3.1 全球主要交易所时区列表
| 交易所 | 代码 | 时区 | IANA 标识 | 夏令时 |
|---|---|---|---|---|
| 上海证券交易所 | SSE | UTC+8 | Asia/Shanghai | 无 |
| 深圳证券交易所 | SZSE | UTC+8 | Asia/Shanghai | 无 |
| 香港交易所 | HKEX | UTC+8 | Asia/Hong_Kong | 无 |
| 纽约证券交易所 | NYSE | UTC-5/-4 | America/New_York | 有(3月中-11月初) |
| 纳斯达克 | NASDAQ | UTC-5/-4 | America/New_York | 有 |
| 伦敦证券交易所 | LSE | UTC+0/+1 | Europe/London | 有(3月末-10月末) |
| 德国交易所 | XETRA | UTC+1/+2 | Europe/Berlin | 有 |
| 东京证券交易所 | TSE | UTC+9 | Asia/Tokyo | 无 |
| 新加坡交易所 | SGX | UTC+8 | Asia/Singapore | 无 |
| 澳大利亚交易所 | ASX | UTC+10/+11 | Australia/Sydney | 有(10月初-4月初) |
3.2 时区转换规则与夏令时处理
时区转换的核心原则是:所有内部处理统一使用 UTC,仅在展示时转换为本地时间。
夏令时(Daylight Saving Time, DST)的处理是时区转换中最容易出错的环节。以美股为例:
- 标准时间(EST):UTC-5,通常为 11 月初至次年 3 月中旬
- 夏令时间(EDT):UTC-4,通常为 3 月中旬至 11 月初
错误的 DST 处理会导致 1 小时的偏移,在分钟级和 Tick 级数据中这是不可接受的。
推荐做法:始终使用 IANA 时区标识(如 America/New_York)而非固定偏移量(如 UTC-5),让 pytz 或 zoneinfo 库自动处理夏令时切换。
from zoneinfo import ZoneInfo # Python 3.9+
# 正确做法:使用 IANA 标识,自动处理夏令时
ny_tz = ZoneInfo("America/New_York")
dt_utc = datetime(2024, 3, 10, 14, 30, tzinfo=ZoneInfo("UTC"))
dt_ny = dt_utc.astimezone(ny_tz) # 自动判断 EST 或 EDT
# 错误做法:使用固定偏移量,无法处理夏令时
# dt_wrong = dt_utc.replace(tzinfo=timezone(timedelta(hours=-5)))
3.3 UTC 统一时间轴的构建方法
构建 UTC 统一时间轴的步骤如下:
- 解析原始时间:将各种格式的时间字符串/时间戳解析为
datetime对象,同时保留原始时区信息。 - 转换为 UTC:使用
astimezone(ZoneInfo("UTC"))将所有时间统一转换到 UTC。 - 存储为 UTC 时间戳:在数据库或 DataFrame 中以 UTC 时间存储,推荐使用 ISO 8601 扩展格式(带
Z后缀)或 Unix 毫秒时间戳。 - 按需转换展示:在需要展示或与特定交易所对齐时,再从 UTC 转换到目标时区。
from zoneinfo import ZoneInfo
def build_utc_timeline(time_values, source_tz="Asia/Shanghai"):
"""将一组时间值统一转换为 UTC 时间轴"""
utc = ZoneInfo("UTC")
local_tz = ZoneInfo(source_tz)
utc_times = []
for t in time_values:
if t.tzinfo is None:
t = t.replace(tzinfo=local_tz)
utc_t = t.astimezone(utc)
utc_times.append(utc_t)
return utc_times
四、Tick 级时间轴处理
4.1 Tick 数据的特殊时间格式
Tick 数据(逐笔成交数据)是交易所产生的最细粒度行情数据,其时间格式具有以下特点:
上交所 Level-2 格式:17 位数字字符串 YYYYMMDDHHmmssSSS
- 示例:
20240115093015023表示 2024-01-15 09:30:15.023 - 精确到毫秒,最后三位为毫秒数
深交所 Level-2 格式:与上交所类似,但部分字段使用纳秒精度(20 位)
- 示例:
20240115093015023000表示 2024-01-15 09:30:15.023000
上交所/深交所原始报文中的时间字段:
HHMMSSsss(9 位):仅时间部分,需结合日期字段使用- 示例:
093015023表示 09:30:15.023
4.2 Tick 时间戳的解析与标准化
from datetime import datetime, timezone
def parse_sse_tick_time(raw_str, date_str=None):
"""
解析上交所 Tick 时间戳
Args:
raw_str: 原始时间字符串,支持 17 位(YYYYMMDDHHmmssSSS)
或 9 位(HHMMSSsss)格式
date_str: 当 raw_str 为 9 位时需提供日期,格式 YYYYMMDD
Returns:
datetime 对象(无时区信息,默认为交易所本地时间)
"""
raw_str = str(raw_str).strip()
if len(raw_str) == 17:
# 完整格式:YYYYMMDDHHmmssSSS
date_part = raw_str[:8]
time_part = raw_str[8:14]
ms_part = raw_str[14:17]
dt = datetime.strptime(f"{date_part}{time_part}", "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(ms_part) * 1000)
return dt
elif len(raw_str) == 9:
# 仅时间格式:HHMMSSsss
if date_str is None:
raise ValueError("9 位格式需要提供 date_str 参数")
time_part = raw_str[:6]
ms_part = raw_str[6:9]
dt = datetime.strptime(f"{date_str}{time_part}", "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(ms_part) * 1000)
return dt
else:
raise ValueError(f"无法识别的 Tick 时间格式: {raw_str}(长度={len(raw_str)})")
4.3 时间序列重采样(Tick → 1 分钟 → 5 分钟 → 日线)
Tick 数据量巨大(单只股票每天可达数十万笔),通常需要重采样为更低频率的 K 线数据。
import pandas as pd
def resample_tick_to_ohlcv(df, freq="1min"):
"""
将 Tick 数据重采样为 OHLCV K 线
Args:
df: DataFrame,必须包含列:datetime, price, volume
freq: 重采样频率,支持 '1min', '5min', '15min', '1h', '1D' 等
Returns:
重采样后的 DataFrame
"""
df = df.copy()
df["datetime"] = pd.to_datetime(df["datetime"])
df = df.set_index("datetime")
ohlcv = df["price"].resample(freq).ohlc()
ohlcv["volume"] = df["volume"].resample(freq).sum()
ohlcv["vwap"] = (df["price"] * df["volume"]).resample(freq).sum() / ohlcv["volume"]
ohlcv = ohlcv.dropna(subset=["open"])
return ohlcv
4.4 时间对齐与插值处理
在多源数据整合时,不同数据源的时间戳往往不完全对齐。常见处理策略包括:
| 策略 | 适用场景 | 说明 |
|---|---|---|
| 前向填充(ffill) | 低频数据补缺 | 用前一个有效值填充缺失时间点 |
| 后向填充(bfill) | 延迟数据补缺 | 用后一个有效值填充缺失时间点 |
| 线性插值 | 连续型数据 | 在两个已知点之间线性插值 |
| 最近邻对齐 | Tick 级数据 | 将每个 Tick 对齐到最近的整分钟时间点 |
| 聚合重采样 | 降频处理 | 将高频数据聚合为低频(如 Tick → 分钟线) |
def align_time_series(df1, df2, freq="1min", method="ffill"):
"""
将两个时间序列对齐到统一的时间网格
Args:
df1, df2: DataFrame,index 为 datetime
freq: 对齐频率
method: 填充方法,'ffill', 'bfill', 'interpolate'
Returns:
对齐后的 (df1_aligned, df2_aligned)
"""
# 创建统一时间网格
all_index = pd.date_range(
start=min(df1.index.min(), df2.index.min()),
end=max(df1.index.max(), df2.index.max()),
freq=freq
)
df1_aligned = df1.reindex(all_index)
df2_aligned = df2.reindex(all_index)
if method == "ffill":
df1_aligned = df1_aligned.ffill()
df2_aligned = df2_aligned.ffill()
elif method == "bfill":
df1_aligned = df1_aligned.bfill()
df2_aligned = df2_aligned.bfill()
elif method == "interpolate":
df1_aligned = df1_aligned.interpolate(method="time")
df2_aligned = df2_aligned.interpolate(method="time")
return df1_aligned, df2_aligned
五、核心代码实现(Python)
以下提供一套完整的时间校准工具函数,可直接集成到量化项目中。
5.1 时间格式自动检测函数
import re
from datetime import datetime, timezone
def detect_time_format(value):
"""
自动检测时间字符串的格式类型
Args:
value: 时间字符串或数值
Returns:
格式名称字符串
"""
s = str(value).strip()
# 纯数字判断
if s.isdigit():
length = len(s)
if length == 10:
return "unix_seconds"
elif length == 13:
return "unix_milliseconds"
elif length == 16:
return "unix_microseconds"
elif length == 8:
return "compact_date" # YYYYMMDD
elif length == 14:
return "compact_datetime" # YYYYMMDDHHmmss
elif length == 17:
return "sse_tick" # YYYYMMDDHHmmssSSS
elif length == 20:
return "szse_tick" # YYYYMMDDHHmmssSSSsss
else:
return "unknown_numeric"
# 中文日期格式
if re.match(r'^\d{4}年\d{1,2}月\d{1,2}日', s):
return "chinese_date"
# ISO 8601 带时区偏移
if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?[+-]\d{2}:\d{2}$', s):
return "iso8601_offset"
# ISO 8601 带 Z(UTC)
if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?Z$', s):
return "iso8601_utc"
# ISO 8601 基本格式
if re.match(r'^\d{8}T\d{6}Z$', s):
return "iso8601_basic"
# 美式日期 MM/DD/YYYY
if re.match(r'^\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2}$', s):
return "us_date"
# Yahoo Finance YYYY-MM-DD
if re.match(r'^\d{4}-\d{2}-\d{2}$', s):
return "yahoo_date"
# Wind 格式(带毫秒)
if re.match(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d{3}$', s):
return "wind_datetime"
# 标准日期时间 YYYY-MM-DD HH:MM:SS
if re.match(r'^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}$', s):
return "standard_datetime"
# Bloomberg 格式
if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}$', s):
return "bloomberg_datetime"
return "unknown"
5.2 统一转换函数
from zoneinfo import ZoneInfo
def parse_to_datetime(value, default_tz="Asia/Shanghai"):
"""
将任意格式的时间值统一解析为带时区的 datetime 对象(UTC)
Args:
value: 时间值,支持字符串、整数、float
default_tz: 当时间值不含时区信息时的默认时区
Returns:
带时区信息的 datetime 对象(统一转为 UTC)
"""
fmt = detect_time_format(value)
s = str(value).strip()
local_tz = ZoneInfo(default_tz)
utc_tz = ZoneInfo("UTC")
if fmt == "unix_seconds":
dt = datetime.fromtimestamp(int(s), tz=utc_tz)
elif fmt == "unix_milliseconds":
dt = datetime.fromtimestamp(int(s) / 1000, tz=utc_tz)
elif fmt == "unix_microseconds":
dt = datetime.fromtimestamp(int(s) / 1_000_000, tz=utc_tz)
elif fmt == "compact_date":
dt = datetime.strptime(s, "%Y%m%d").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "compact_datetime":
dt = datetime.strptime(s, "%Y%m%d%H%M%S").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "sse_tick":
dt = parse_sse_tick_time(s)
dt = dt.replace(tzinfo=local_tz).astimezone(utc_tz)
elif fmt == "szse_tick":
date_part = s[:8]
time_part = s[8:14]
ms_part = s[14:20]
dt = datetime.strptime(f"{date_part}{time_part}", "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(ms_part))
dt = dt.replace(tzinfo=local_tz).astimezone(utc_tz)
elif fmt == "chinese_date":
# 移除中文标点后解析
clean = s.replace("年", "-").replace("月", "-").replace("日", "")
dt = datetime.strptime(clean.strip(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "iso8601_offset":
dt = datetime.fromisoformat(s)
dt = dt.astimezone(utc_tz)
elif fmt == "iso8601_utc":
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
elif fmt == "iso8601_basic":
dt = datetime.strptime(s, "%Y%m%dT%H%M%SZ").replace(tzinfo=utc_tz)
elif fmt == "us_date":
dt = datetime.strptime(s, "%m/%d/%Y %H:%M:%S").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "yahoo_date":
dt = datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "wind_datetime":
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "standard_datetime":
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=local_tz)
dt = dt.astimezone(utc_tz)
elif fmt == "bloomberg_datetime":
dt = datetime.fromisoformat(s)
dt = dt.astimezone(utc_tz)
else:
raise ValueError(f"无法解析的时间格式: {s}(检测到格式: {fmt})")
return dt
def to_standard_format(value, default_tz="Asia/Shanghai", output_tz="Asia/Shanghai"):
"""
将任意格式的时间值转换为标准输出格式
Args:
value: 时间值
default_tz: 输入默认时区
output_tz: 输出时区
Returns:
标准格式字符串:YYYY-MM-DD HH:MM:SS+TZ
"""
dt_utc = parse_to_datetime(value, default_tz=default_tz)
target_tz = ZoneInfo(output_tz)
dt_local = dt_utc.astimezone(target_tz)
return dt_local.strftime("%Y-%m-%d %H:%M:%S") + dt_local.strftime("%z")
5.3 时区转换函数
def convert_timezone(dt, from_tz, to_tz):
"""
时区转换函数
Args:
dt: datetime 对象(可以有时区也可以无时区)
from_tz: 源时区 IANA 标识
to_tz: 目标时区 IANA 标识
Returns:
转换后的 datetime 对象
"""
src_tz = ZoneInfo(from_tz)
dst_tz = ZoneInfo(to_tz)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=src_tz)
return dt.astimezone(dst_tz)
def get_exchange_time(utc_dt, exchange):
"""
将 UTC 时间转换为指定交易所的本地时间
Args:
utc_dt: UTC 时间的 datetime 对象
exchange: 交易所代码,如 'SSE', 'NYSE', 'HKEX', 'LSE' 等
Returns:
交易所本地时间的 datetime 对象
"""
tz_map = {
"SSE": "Asia/Shanghai",
"SZSE": "Asia/Shanghai",
"HKEX": "Asia/Hong_Kong",
"NYSE": "America/New_York",
"NASDAQ": "America/New_York",
"LSE": "Europe/London",
"XETRA": "Europe/Berlin",
"TSE": "Asia/Tokyo",
"SGX": "Asia/Singapore",
"ASX": "Australia/Sydney",
}
if exchange.upper() not in tz_map:
raise ValueError(f"不支持的交易所: {exchange},支持的交易所: {list(tz_map.keys())}")
return convert_timezone(utc_dt, "UTC", tz_map[exchange.upper()])
5.4 Tick 时间戳解析函数
def parse_tick_timestamp(raw_value, date_str=None, exchange="SSE"):
"""
通用 Tick 时间戳解析函数
Args:
raw_value: 原始 Tick 时间值
date_str: 日期字符串(当 raw_value 仅包含时间部分时使用)
exchange: 交易所代码
Returns:
带 UTC 时区信息的 datetime 对象
"""
s = str(raw_value).strip()
tz_map = {
"SSE": "Asia/Shanghai",
"SZSE": "Asia/Shanghai",
"HKEX": "Asia/Hong_Kong",
"NYSE": "America/New_York",
}
local_tz = ZoneInfo(tz_map.get(exchange.upper(), "Asia/Shanghai"))
if len(s) == 17:
# 上交所/深交所完整格式
dt = parse_sse_tick_time(s)
elif len(s) == 9:
# 仅时间部分
if date_str is None:
raise ValueError("9 位格式需要提供 date_str")
dt = parse_sse_tick_time(s, date_str=date_str)
elif len(s) == 20:
# 深交所纳秒格式
date_part = s[:8]
time_part = s[8:14]
ns_part = s[14:20]
dt = datetime.strptime(f"{date_part}{time_part}", "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(ns_part[:3]) * 1000)
else:
# 尝试通用解析
dt = parse_to_datetime(s, default_tz=tz_map.get(exchange.upper(), "Asia/Shanghai"))
return dt
dt = dt.replace(tzinfo=local_tz)
return dt.astimezone(ZoneInfo("UTC"))
5.5 时间序列重采样函数
def resample_ohlcv(df, time_col="datetime", price_col="price",
volume_col="volume", freq="1min"):
"""
通用时间序列重采样函数
Args:
df: 原始 DataFrame
time_col: 时间列名
price_col: 价格列名
volume_col: 成交量列名
freq: 目标频率(Pandas 频率字符串)
Returns:
包含 OHLCV 数据的 DataFrame
"""
df = df.copy()
df[time_col] = pd.to_datetime(df[time_col])
df = df.set_index(time_col)
result = pd.DataFrame()
result["open"] = df[price_col].resample(freq).first()
result["high"] = df[price_col].resample(freq).max()
result["low"] = df[price_col].resample(freq).min()
result["close"] = df[price_col].resample(freq).last()
result["volume"] = df[volume_col].resample(freq).sum()
# 计算成交量加权平均价
vwap_numerator = (df[price_col] * df[volume_col]).resample(freq).sum()
result["vwap"] = vwap_numerator / result["volume"]
# 计算成交笔数
result["count"] = df[price_col].resample(freq).count()
result = result.dropna(subset=["open"])
return result
5.6 数据校验与清洗函数
def validate_time_series(df, time_col="datetime", expected_freq=None,
start_date=None, end_date=None):
"""
时间序列数据校验函数
Args:
df: 待校验的 DataFrame
time_col: 时间列名
expected_freq: 预期频率(如 'B' 代表工作日,'D' 代表日历日)
start_date: 预期起始日期
end_date: 预期结束日期
Returns:
dict: 包含校验结果和问题列表
"""
report = {"passed": True, "issues": [], "stats": {}}
df = df.copy()
df[time_col] = pd.to_datetime(df[time_col])
# 检查 1:是否有空值
null_count = df[time_col].isnull().sum()
report["stats"]["null_count"] = null_count
if null_count > 0:
report["issues"].append(f"时间列存在 {null_count} 个空值")
report["passed"] = False
# 检查 2:是否有重复时间
dup_count = df[time_col].duplicated().sum()
report["stats"]["duplicate_count"] = dup_count
if dup_count > 0:
report["issues"].append(f"时间列存在 {dup_count} 个重复值")
report["passed"] = False
# 检查 3:时间是否单调递增
is_sorted = df[time_col].is_monotonic_increasing
report["stats"]["is_sorted"] = is_sorted
if not is_sorted:
report["issues"].append("时间列未按升序排列")
report["passed"] = False
# 检查 4:时间范围
actual_start = df[time_col].min()
actual_end = df[time_col].max()
report["stats"]["start_date"] = str(actual_start)
report["stats"]["end_date"] = str(actual_end)
report["stats"]["total_records"] = len(df)
if start_date and actual_start > pd.Timestamp(start_date):
report["issues"].append(f"数据起始时间 {actual_start} 晚于预期 {start_date}")
report["passed"] = False
if end_date and actual_end < pd.Timestamp(end_date):
report["issues"].append(f"数据结束时间 {actual_end} 早于预期 {end_date}")
report["passed"] = False
# 检查 5:缺失值检测(基于预期频率)
if expected_freq:
full_range = pd.date_range(start=actual_start, end=actual_end, freq=expected_freq)
missing = full_range.difference(df[time_col])
report["stats"]["missing_count"] = len(missing)
if len(missing) > 0:
report["issues"].append(f"按 {expected_freq} 频率检测到 {len(missing)} 个缺失时间点")
if len(missing) <= 20:
report["stats"]["missing_dates"] = [str(d) for d in missing]
return report
def clean_time_series(df, time_col="datetime", sort=True, drop_duplicates=True):
"""
时间序列数据清洗函数
Args:
df: 原始 DataFrame
time_col: 时间列名
sort: 是否按时间排序
drop_duplicates: 是否删除重复时间记录
Returns:
清洗后的 DataFrame
"""
df = df.copy()
df[time_col] = pd.to_datetime(df[time_col])
# 删除空值
df = df.dropna(subset=[time_col])
if drop_duplicates:
df = df.drop_duplicates(subset=[time_col], keep="last")
if sort:
df = df.sort_values(time_col).reset_index(drop=True)
return df
六、API 适配层设计
6.1 统一时间适配接口
设计一个抽象基类,所有 API 适配器都实现统一接口,从而屏蔽各 API 之间的时间格式差异。
from abc import ABC, abstractmethod
from typing import Optional
class TimeAdapter(ABC):
"""时间适配器抽象基类"""
@abstractmethod
def get_name(self) -> str:
"""返回适配器名称"""
pass
@abstractmethod
def parse_time(self, raw_value) -> datetime:
"""将 API 原始时间格式解析为 UTC datetime"""
pass
@abstractmethod
def format_time(self, dt: datetime) -> str:
"""将 UTC datetime 格式化为 API 所需的时间字符串"""
pass
@abstractmethod
def get_time_fields(self) -> dict:
"""返回该 API 的时间字段映射表"""
pass
@abstractmethod
def get_default_timezone(self) -> str:
"""返回该 API 的默认时区"""
pass
6.2 各 API 的时间字段映射表
| API | 时间字段名 | 原始格式 | 默认时区 | 示例值 |
|---|---|---|---|---|
| Tushare | trade_date | YYYYMMDD | Asia/Shanghai | 20240115 |
| Tushare (分钟) | trade_time | YYYYMMDD HH:MM:SS | Asia/Shanghai | 20240115 09:30:00 |
| AKShare | date | YYYY-MM-DD | Asia/Shanghai | 2024-01-15 |
| Yahoo Finance | timestamp | Unix 毫秒 | America/New_York | 1705300200000 |
| Alpha Vantage | timestamp | YYYY-MM-DD | America/New_York | 2024-01-15 |
| Wind | DATETIME | YYYY-MM-DD HH:MM:SS.000 | Asia/Shanghai | 2024-01-15 09:30:00.000 |
| Bloomberg | date/time | ISO 8601+TZ | 依数据类型 | 2024-01-15T09:30:00.000+08:00 |
| 聚宽 | time | YYYY-MM-DD HH:MM:SS | Asia/Shanghai | 2024-01-01 09:30:00 |
| 东方财富 | date | YYYYMMDD | Asia/Shanghai | 20240115 |
| 同花顺 | date | YYYYMMDD | Asia/Shanghai | 20240115 |
6.3 适配器模式代码实现
class TushareAdapter(TimeAdapter):
"""Tushare API 时间适配器"""
def get_name(self):
return "Tushare"
def get_default_timezone(self):
return "Asia/Shanghai"
def parse_time(self, raw_value):
s = str(raw_value).strip()
if len(s) == 8:
dt = datetime.strptime(s, "%Y%m%d")
elif len(s) == 17:
dt = datetime.strptime(s[:14], "%Y%m%d%H%M%S")
dt = dt.replace(microsecond=int(s[14:17]) * 1000)
else:
dt = datetime.strptime(s, "%Y%m%d %H:%M:%S")
return dt.replace(tzinfo=ZoneInfo(self.get_default_timezone())).astimezone(ZoneInfo("UTC"))
def format_time(self, dt):
local_dt = dt.astimezone(ZoneInfo(self.get_default_timezone()))
return local_dt.strftime("%Y%m%d")
def get_time_fields(self):
return {
"daily": "trade_date",
"minute": "trade_time",
"format": "YYYYMMDD",
}
class YahooFinanceAdapter(TimeAdapter):
"""Yahoo Finance API 时间适配器"""
def get_name(self):
return "YahooFinance"
def get_default_timezone(self):
return "America/New_York"
def parse_time(self, raw_value):
if isinstance(raw_value, (int, float)):
dt = datetime.fromtimestamp(raw_value / 1000, tz=ZoneInfo("UTC"))
else:
s = str(raw_value).strip()
if len(s) == 10:
dt = datetime.strptime(s, "%Y-%m-%d")
dt = dt.replace(tzinfo=ZoneInfo(self.get_default_timezone()))
dt = dt.astimezone(ZoneInfo("UTC"))
else:
dt = datetime.fromisoformat(s)
dt = dt.astimezone(ZoneInfo("UTC"))
return dt
def format_time(self, dt):
local_dt = dt.astimezone(ZoneInfo(self.get_default_timezone()))
return local_dt.strftime("%Y-%m-%d")
def get_time_fields(self):
return {
"daily": "timestamp",
"format": "unix_ms_or_date",
}
class AKShareAdapter(TimeAdapter):
"""AKShare API 时间适配器"""
def get_name(self):
return "AKShare"
def get_default_timezone(self):
return "Asia/Shanghai"
def parse_time(self, raw_value):
s = str(raw_value).strip()
if "-" in s:
dt = datetime.fromisoformat(s)
else:
dt = datetime.strptime(s, "%Y%m%d")
return dt.replace(tzinfo=ZoneInfo(self.get_default_timezone())).astimezone(ZoneInfo("UTC"))
def format_time(self, dt):
local_dt = dt.astimezone(ZoneInfo(self.get_default_timezone()))
return local_dt.strftime("%Y-%m-%d")
def get_time_fields(self):
return {
"daily": "date",
"format": "YYYY-MM-DD",
}
class WindAdapter(TimeAdapter):
"""Wind API 时间适配器"""
def get_name(self):
return "Wind"
def get_default_timezone(self):
return "Asia/Shanghai"
def parse_time(self, raw_value):
s = str(raw_value).strip()
if "." in s and "T" not in s:
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f")
elif "T" in s:
dt = datetime.fromisoformat(s)
else:
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
return dt.replace(tzinfo=ZoneInfo(self.get_default_timezone())).astimezone(ZoneInfo("UTC"))
def format_time(self, dt):
local_dt = dt.astimezone(ZoneInfo(self.get_default_timezone()))
return local_dt.strftime("%Y-%m-%d %H:%M:%S.000")
def get_time_fields(self):
return {
"daily": "DATETIME",
"minute": "DATETIME",
"format": "YYYY-MM-DD HH:MM:SS.000",
}
class BloombergAdapter(TimeAdapter):
"""Bloomberg API 时间适配器"""
def get_name(self):
return "Bloomberg"
def get_default_timezone(self):
return "UTC" # Bloomberg 通常返回 UTC
def parse_time(self, raw_value):
s = str(raw_value).strip()
dt = datetime.fromisoformat(s)
return dt.astimezone(ZoneInfo("UTC"))
def format_time(self, dt):
utc_dt = dt.astimezone(ZoneInfo("UTC"))
return utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000+00:00")
def get_time_fields(self):
return {
"daily": "date",
"intraday": "time",
"format": "ISO8601_with_offset",
}
# 适配器工厂
class AdapterFactory:
"""时间适配器工厂"""
_adapters = {
"tushare": TushareAdapter,
"yahoo": YahooFinanceAdapter,
"akshare": AKShareAdapter,
"wind": WindAdapter,
"bloomberg": BloombergAdapter,
}
@classmethod
def get_adapter(cls, api_name: str) -> TimeAdapter:
"""根据 API 名称获取对应的适配器"""
api_name_lower = api_name.lower()
if api_name_lower not in cls._adapters:
raise ValueError(f"不支持的 API: {api_name},支持: {list(cls._adapters.keys())}")
return cls._adapters[api_name_lower]()
@classmethod
def register_adapter(cls, api_name: str, adapter_class: type):
"""注册自定义适配器"""
cls._adapters[api_name.lower()] = adapter_class
def normalize_dataframe(df, api_name, time_col=None):
"""
使用适配器将 DataFrame 的时间列统一为 UTC
Args:
df: 原始 DataFrame
api_name: API 名称
time_col: 时间列名(若为 None 则自动检测)
Returns:
添加了 'utc_time' 列的 DataFrame
"""
adapter = AdapterFactory.get_adapter(api_name)
fields = adapter.get_time_fields()
if time_col is None:
time_col = fields.get("daily", "date")
df = df.copy()
df["utc_time"] = df[time_col].apply(adapter.parse_time)
return df
七、实战案例
7.1 多 API 数据整合案例
以下示例演示如何同时从 Tushare 和 AKShare 获取贵州茅台(600519)的日线数据并合并。
import pandas as pd
# ===== 模拟数据(实际使用时替换为真实 API 调用) =====
# Tushare 格式数据
tushare_data = pd.DataFrame({
"trade_date": ["20240115", "20240116", "20240117", "20240118", "20240119"],
"open": [1680.0, 1695.0, 1702.0, 1698.0, 1710.0],
"close": [1695.0, 1700.0, 1698.0, 1710.0, 1725.0],
"volume": [25000, 28000, 22000, 30000, 35000],
"source": ["tushare"] * 5,
})
# AKShare 格式数据
akshare_data = pd.DataFrame({
"date": ["2024-01-15", "2024-01-16", "2024-01-17", "2024-01-18", "2024-01-19"],
"open": [1680.0, 1695.0, 1702.0, 1698.0, 1710.0],
"close": [1695.0, 1700.0, 1698.0, 1710.0, 1725.0],
"volume": [25000, 28000, 22000, 30000, 35000],
"source": ["akshare"] * 5,
})
# ===== 步骤 1:使用适配器标准化时间 =====
tushare_normalized = normalize_dataframe(tushare_data, "tushare", time_col="trade_date")
akshare_normalized = normalize_dataframe(akshare_data, "akshare", time_col="date")
# ===== 步骤 2:提取日期用于合并 =====
tushare_normalized["date_key"] = tushare_normalized["utc_time"].dt.date
akshare_normalized["date_key"] = akshare_normalized["utc_time"].dt.date
# ===== 步骤 3:合并数据 =====
merged = pd.merge(
tushare_normalized[["date_key", "open", "close", "volume", "source"]],
akshare_normalized[["date_key", "open", "close", "volume", "source"]],
on="date_key",
suffixes=("_tushare", "_akshare"),
how="outer"
)
print("合并结果:")
print(merged.to_string(index=False))
# 输出:两列数据完美对齐,date_key 列为统一日期
7.2 Tick 数据时间轴重建案例
以下示例演示如何将上交所原始 Tick 数据重建为标准化的分钟级 K 线。
import numpy as np
# ===== 模拟上交所 Tick 数据 =====
np.random.seed(42)
base_dt = datetime(2024, 1, 15, 9, 30, 0)
tick_data = []
price = 1680.0
for i in range(500):
# 模拟 Tick 时间(每笔间隔 0.1~2 秒)
interval_ms = np.random.randint(100, 2000)
tick_dt = base_dt + pd.Timedelta(milliseconds=interval_ms)
base_dt = tick_dt
# 模拟价格变动
price += np.random.normal(0, 0.5)
volume = np.random.randint(1, 100)
# 生成上交所格式时间戳
tick_str = tick_dt.strftime("%Y%m%d%H%M%S") + f"{tick_dt.microsecond // 1000:03d}"
tick_data.append({
"tick_time": tick_str,
"price": round(price, 2),
"volume": volume,
})
df_ticks = pd.DataFrame(tick_data)
# ===== 步骤 1:解析 Tick 时间 =====
df_ticks["datetime"] = df_ticks["tick_time"].apply(
lambda x: parse_tick_timestamp(x, exchange="SSE")
)
# ===== 步骤 2:重采样为 1 分钟 K 线 =====
df_ticks["datetime"] = pd.to_datetime(df_ticks["datetime"])
df_ticks = df_ticks.set_index("datetime")
ohlcv_1min = pd.DataFrame()
ohlcv_1min["open"] = df_ticks["price"].resample("1min").first()
ohlcv_1min["high"] = df_ticks["price"].resample("1min").max()
ohlcv_1min["low"] = df_ticks["price"].resample("1min").min()
ohlcv_1min["close"] = df_ticks["price"].resample("1min").last()
ohlcv_1min["volume"] = df_ticks["volume"].resample("1min").sum()
ohlcv_1min = ohlcv_1min.dropna()
print("\n1 分钟 K 线(前 5 条):")
print(ohlcv_1min.head().to_string())
# ===== 步骤 3:进一步重采样为 5 分钟 K 线 =====
ohlcv_5min = pd.DataFrame()
ohlcv_5min["open"] = ohlcv_1min["open"].resample("5min").first()
ohlcv_5min["high"] = ohlcv_1min["high"].resample("5min").max()
ohlcv_5min["low"] = ohlcv_1min["low"].resample("5min").min()
ohlcv_5min["close"] = ohlcv_1min["close"].resample("5min").last()
ohlcv_5min["volume"] = ohlcv_1min["volume"].resample("5min").sum()
ohlcv_5min = ohlcv_5min.dropna()
print("\n5 分钟 K 线:")
print(ohlcv_5min.to_string())
7.3 跨市场时间校准案例(A 股 + 美股数据对齐)
以下示例演示如何将 A 股和美股数据统一到 UTC 时间轴上进行对比分析。
# ===== 模拟 A 股数据(北京时间) =====
a_stock_data = pd.DataFrame({
"local_time": [
"2024-01-15 09:30:00",
"2024-01-15 10:00:00",
"2024-01-15 10:30:00",
"2024-01-15 11:00:00",
"2024-01-15 13:00:00",
"2024-01-15 14:00:00",
"2024-01-15 15:00:00",
],
"price": [1680.0, 1685.0, 1682.0, 1690.0, 1688.0, 1695.0, 1700.0],
"market": ["A"] * 7,
})
# ===== 模拟美股数据(美东时间,考虑夏令时 EDT = UTC-4) =====
us_stock_data = pd.DataFrame({
"local_time": [
"2024-01-14 16:00:00", # 美股开盘
"2024-01-14 17:00:00",
"2024-01-14 18:00:00",
"2024-01-14 19:00:00",
"2024-01-14 20:00:00",
"2024-01-14 21:00:00",
"2024-01-14 22:00:00", # 美股收盘
],
"price": [480.0, 482.0, 479.0, 483.0, 485.0, 484.0, 486.0],
"market": ["US"] * 7,
})
# ===== 步骤 1:统一转换为 UTC =====
a_stock_data["utc_time"] = a_stock_data["local_time"].apply(
lambda x: convert_timezone(
datetime.strptime(x, "%Y-%m-%d %H:%M:%S"),
"Asia/Shanghai", "UTC"
)
)
us_stock_data["utc_time"] = us_stock_data["local_time"].apply(
lambda x: convert_timezone(
datetime.strptime(x, "%Y-%m-%d %H:%M:%S"),
"America/New_York", "UTC"
)
)
# ===== 步骤 2:合并到统一时间轴 =====
a_stock_data = a_stock_data[["utc_time", "price", "market"]].rename(columns={"price": "a_price"})
us_stock_data = us_stock_data[["utc_time", "price", "market"]].rename(columns={"price": "us_price"})
combined = pd.merge_ordered(a_stock_data, us_stock_data, on="utc_time", how="outer")
combined = combined.sort_values("utc_time").reset_index(drop=True)
# 前向填充以展示完整时间线
combined["a_price"] = combined["a_price"].ffill()
combined["us_price"] = combined["us_price"].ffill()
print("跨市场统一时间轴:")
print(combined.to_string(index=False))
# ===== 步骤 3:计算时间重叠区间 =====
a_times = set(a_stock_data["utc_time"])
us_times = set(us_stock_data["utc_time"])
overlap = a_times & us_times
print(f"\nA 股交易时段(UTC): {min(a_times)} ~ {max(a_times)}")
print(f"美股交易时段(UTC): {min(us_times)} ~ {max(us_times)}")
print(f"重叠时间点数量: {len(overlap)}")
八、最佳实践与注意事项
8.1 数据质量检查清单
在完成时间校准后,应按以下清单逐项检查数据质量:
- 时区一致性:所有时间是否已统一转换为 UTC?是否存在遗漏的本地时间?
- 格式一致性:DataFrame 中的时间列是否为
datetime64[ns, UTC]类型? - 单调性检查:时间序列是否严格单调递增?是否有乱序数据?
- 重复性检查:是否存在重复的时间戳?重复记录是否已正确处理?
- 完整性检查:交易日历是否完整?非交易日(周末、节假日)的数据是否已正确剔除或标记?
- 边界检查:夏令时切换日(3 月、11 月)的数据是否存在 1 小时偏移?
- 精度检查:毫秒/微秒级数据是否在存储过程中丢失精度?
- 跨年检查:跨年数据是否正确处理了年份切换?
- 闰秒检查:数据源是否包含闰秒(23:59:60)?是否需要特殊处理?
8.2 常见陷阱与解决方案
| 陷阱 | 说明 | 解决方案 |
|---|---|---|
| 时区信息丢失 | datetime 对象在序列化/反序列化过程中丢失时区信息 |
始终使用带时区的 datetime,存储时保留时区 |
| 夏令时边界错误 | 在 DST 切换日产生重复或缺失的时间点 | 使用 IANA 时区标识,让库自动处理 |
| Unix 时间戳精度混淆 | 秒级(10 位)与毫秒级(13 位)时间戳混淆 | 先根据数字长度判断精度,再解析 |
| 日期格式歧义 | 01/02/2024 在美式和中式格式下含义不同 |
明确数据来源,使用对应的解析格式 |
| Pandas 时区陷阱 | pd.Timestamp.tz_localize() 与 tz_convert() 混淆 |
localize 是"赋予时区",convert 是"转换时区" |
| 字符串与 datetime 混用 | DataFrame 中时间列类型不一致 | 统一转换为 pd.DatetimeTZDtype(tz="UTC") |
| 交易所时间与自然时间 | 交易所交易时段与自然日不一致(如夜盘) | 使用交易所日历而非自然日历 |
| 数据库时区设置 | 数据库连接的时区设置影响查询结果 | 数据库统一存储 UTC,连接时设置时区 |
| 浮点数时间戳精度丢失 | 部分系统用 float 存储 Unix 时间戳,精度丢失 | 使用整数存储时间戳,避免浮点数 |
| 2038 年问题 | 32 位 Unix 时间戳在 2038 年溢出 | 使用 64 位整数或 ISO 8601 字符串存储 |
8.3 性能优化建议
1. 批量解析替代逐行解析
当需要解析大量时间字符串时,应使用 Pandas 的向量化操作替代逐行 apply:
# 慢:逐行解析
df["utc_time"] = df["raw_time"].apply(lambda x: parse_to_datetime(x))
# 快:向量化解析(对于已知格式)
df["utc_time"] = pd.to_datetime(df["raw_time"], format="%Y%m%d")
df["utc_time"] = df["utc_time"].dt.tz_localize("Asia/Shanghai").dt.tz_convert("UTC")
2. 缓存时区对象
避免在循环中重复创建时区对象:
# 慢
for t in times:
dt = t.astimezone(ZoneInfo("America/New_York"))
# 快
ny_tz = ZoneInfo("America/New_York")
for t in times:
dt = t.astimezone(ny_tz)
3. 使用整数时间戳进行存储和计算
在数据库中优先使用 Unix 毫秒时间戳(BIGINT)进行存储,避免字符串解析开销。仅在展示层转换为可读格式。
4. 预生成交易日历
对于日线数据,预先生成交易日历表并缓存,避免每次查询时重新计算:
def generate_trade_calendar(start, end, market="A"):
"""预生成交易日历"""
# 可使用 exchange_calendars 库
import exchange_calendars as ec
cal = ec.get_calendar("XSHG" if market == "A" else "XNYS")
return cal.valid_days(start_date=start, end_date=end)
5. 使用 Numba 加速 Tick 数据处理
对于百万级以上的 Tick 数据,可使用 Numba 加速时间戳解析:
from numba import njit
import numpy as np
@njit
def parse_tick_timestamps_fast(arr):
"""使用 Numba 加速批量 Tick 时间戳解析"""
n = len(arr)
result = np.empty(n, dtype=np.int64)
for i in range(n):
s = str(arr[i])
year = int(s[0:4])
month = int(s[4:6])
day = int(s[6:8])
hour = int(s[8:10])
minute = int(s[10:12])
second = int(s[12:14])
ms = int(s[14:17])
# 转换为 Unix 毫秒时间戳(简化版,未处理闰秒)
# 实际使用时建议使用 pandas 向量化方法
result[i] = 0 # 占位,实际需完整实现
return result
6. 增量更新策略
对于持续运行的数据管道,采用增量更新而非全量重建:
def incremental_update(existing_df, new_data, time_col="utc_time"):
"""增量合并新数据到已有 DataFrame"""
if existing_df is None or existing_df.empty:
return new_data
last_time = existing_df[time_col].max()
new_records = new_data[new_data[time_col] > last_time]
if new_records.empty:
return existing_df
return pd.concat([existing_df, new_records], ignore_index=True)
文档版本:v1.0
最后更新:2025 年 5 月
适用 Python 版本:3.9+
核心依赖:pandas,python-dateutil,zoneinfo(标准库),pytz(可选)
上述内容经供学习参考,股市有风险投资需谨慎