美股热点新闻数据模块(技术学习版)
免责声明:本文档仅供技术学习和学术研究参考,不构成任何投资建议、投资咨询或证券交易指导。股市有风险,投资需谨慎。任何基于本文档内容进行的投资决策,均由投资者自行承担全部风险和责任。美股市场受美国SEC监管,跨境投资需遵守相关法律法规。文档中所有策略均以"技术分析工具框架"形式呈现,不包含任何具体个股推荐、买卖建议或收益承诺。
一、模块概述
1.1 模块定位
本模块旨在构建一个面向学术研究和技术学习的美股市场公开新闻资讯数据获取与处理框架。模块核心功能为:获取近90个交易日美股市场公开新闻资讯数据,并对数据进行结构化存储、NLP情感分析、中概股映射以及可视化展示。
90个交易日约覆盖一个完整季度(约4.5个自然月),能够捕捉季度财报周期、美联储议息会议、重要经济数据发布等关键事件窗口内的市场新闻动态,为学术研究提供充足的数据样本。
1.2 美股新闻的重要性
美股是全球最大资本市场,日均交易量超过数百亿美元。其市场动态对全球金融市场(包括A股、港股、欧股、新兴市场)具有重要影响。了解美股新闻资讯对于以下学术研究方向具有参考价值:
- 跨境市场联动研究:研究美股波动如何通过信息渠道传导至其他市场
- 事件驱动分析:分析特定新闻事件对市场情绪和价格行为的影响机制
- NLP技术应用:将自然语言处理技术应用于金融文本分析领域
1.3 数据来源
本模块整合以下公开数据渠道:
| 数据源类型 | 具体来源 | 数据特点 |
|---|---|---|
| 开放API | Yahoo Finance (yfinance) | 免费个股新闻,英文 |
| 开放API | Finnhub | 结构化新闻数据,支持筛选 |
| 开放API | Alpha Vantage | 基本面+新闻,有限免费额度 |
| 财经媒体 | Seeking Alpha | 深度分析文章,需注意版权 |
| 财经媒体 | MarketWatch / CNBC | 实时新闻快讯 |
| 中文媒体 | 华尔街见闻 / 财联社 / 雪球 | 中文美股报道 |
1.4 应用场景声明
本模块所有功能和数据仅供学术研究和技术学习使用,明确不用于实际交易。 合法应用场景包括但不限于:
- 金融工程课程教学演示
- NLP模型训练与评估
- 跨境市场联动学术研究
- 金融数据工程实践学习
二、美股市场新闻分类
2.1 个股新闻
个股新闻是美股资讯的核心组成部分,覆盖上市公司层面的重大事件:
| 子类别 | 说明 | 学术研究价值 |
|---|---|---|
| 财报发布 | 季度/年度财报(10-K/10-Q)、财报电话会议 | 盈余公告后价格漂移(PEAD)研究 |
| 并购重组 | M&A公告、要约收购、资产剥离 | 事件研究法(Event Study) |
| 产品发布 | 新产品/服务上线、技术突破 | 创新事件的市场反应 |
| 管理层变动 | CEO更替、CFO变动、董事变更 | 管理层信号传递理论 |
| 监管动态 | FDA审批、反垄断调查、专利诉讼 | 监管事件风险定价 |
| 分析师评级 | 升降级、目标价调整、首次覆盖 | 分析师行为研究 |
2.2 宏观新闻
宏观层面的新闻对整个美股市场产生系统性影响:
- 美联储相关:FOMC议息决议、利率决议、鲍威尔讲话、货币政策纪要
- 经济数据:非农就业(NFP)、CPI/PCE通胀数据、GDP、PMI、零售销售
- 财政政策:美国国债上限、财政预算、税收政策变动
- 地缘政治:国际贸易摩擦、地缘冲突、大选动态
2.3 行业新闻
重点关注的行业板块及其核心驱动因素:
- 科技(Technology):AI技术进展、云计算、SaaS订阅数据、大型科技股(Mega Cap Tech)
- 半导体(Semiconductor):芯片需求周期、台积电/英伟达业绩指引、出口管制政策
- 医药/生物科技(Healthcare):FDA审批结果、临床试验数据、医保政策
- 能源(Energy):原油/天然气价格、OPEC+决策、新能源转型
- 金融(Financials):银行财报、监管资本要求、利率环境对净息差影响
2.4 中概股新闻
中概股(Chinese Concept Stocks)是连接美股与A股的重要桥梁:
- 财报与业绩:阿里巴巴、拼多多、京东等中概股季度业绩
- 中美监管动态:PCAOB审计监管合作进展
- 上市/退市动态:回港双重上市、退市风险警示
- 行业政策影响:中国国内行业监管政策对在美上市公司的影响
三、数据结构设计
3.1 美股新闻主表(Schema)
CREATE TABLE IF NOT EXISTS us_stock_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
news_id TEXT UNIQUE NOT NULL, -- 新闻唯一标识(MD5哈希)
title TEXT NOT NULL, -- 新闻标题
summary TEXT, -- 新闻摘要
content TEXT, -- 新闻正文(如有)
source TEXT NOT NULL, -- 来源媒体
source_url TEXT, -- 原文链接
publish_time TIMESTAMP NOT NULL, -- 发布时间(UTC)
collect_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 采集时间
-- 关联标的
related_tickers TEXT, -- 关联股票代码(逗号分隔)
news_type TEXT, -- 类型:stock/macro/sector/chinese
sector TEXT, -- 所属行业
-- NLP分析结果
sentiment_score REAL, -- 情感得分(-1到1)
sentiment_label TEXT, -- 情感标签:positive/neutral/negative
keywords TEXT, -- 关键词(JSON数组)
importance INTEGER DEFAULT 0, -- 重要性评分(0-10)
-- 元数据
language TEXT DEFAULT 'en', -- 语言:en/zh
is_duplicate INTEGER DEFAULT 0, -- 是否重复
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_news_publish_time ON us_stock_news(publish_time);
CREATE INDEX idx_news_tickers ON us_stock_news(related_tickers);
CREATE INDEX idx_news_type ON us_stock_news(news_type);
CREATE INDEX idx_news_sentiment ON us_stock_news(sentiment_score);
3.2 中概股映射表
CREATE TABLE IF NOT EXISTS chinese_concept_mapping (
id INTEGER PRIMARY KEY AUTOINCREMENT,
us_ticker TEXT UNIQUE NOT NULL, -- 美股代码(如 BABA)
us_name TEXT, -- 美股名称
hk_ticker TEXT, -- 港股代码(如 9988.HK)
hk_name TEXT, -- 港股名称
cn_ticker TEXT, -- A股代码(如有)
cn_name TEXT, -- A股名称
sector TEXT, -- 所属行业
adr_ratio REAL, -- ADR比率
note TEXT, -- 备注
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
初始数据示例(仅供技术学习参考,不构成任何投资建议):
| 美股代码 | 公司名称 | 港股代码 | 行业 |
|---|---|---|---|
| BABA | Alibaba Group | 9988.HK | 电商 |
| PDD | PDD Holdings | — | 电商 |
| JD | JD.com | 9618.HK | 电商 |
| BIDU | Baidu | 9888.HK | AI/搜索 |
| NIO | NIO Inc. | 9866.HK | 新能源车 |
| LI | Li Auto | 2015.HK | 新能源车 |
| XPEV | XPeng | 9868.HK | 新能源车 |
| NETEASE | NetEase | 9999.HK | 游戏 |
| EDU | New Oriental | 9901.HK | 教育 |
3.3 行业影响映射表
CREATE TABLE IF NOT EXISTS sector_impact_mapping (
id INTEGER PRIMARY KEY AUTOINCREMENT,
us_sector TEXT NOT NULL, -- 美股行业
cn_sector TEXT NOT NULL, -- 对应A股行业/概念
correlation_note TEXT, -- 关联说明(学术参考)
typical_tickers_us TEXT, -- 代表性美股代码
typical_tickers_cn TEXT, -- 代表性A股代码
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
四、数据采集方案
4.1 API数据源
4.1.1 yfinance(推荐入门使用)
import yfinance as yf
# 获取个股新闻(免费,无需API Key)
ticker = yf.Ticker("AAPL")
news = ticker.news # 返回最近新闻列表
特点:免费、无需注册、数据来源为Yahoo Finance聚合。限制:请求频率不宜过高,建议每分钟不超过60次。
4.1.2 Finnhub
import requests
API_KEY = "your_api_key" # 免费注册获取
url = f"https://finnhub.io/api/v1/company-news?symbol=AAPL&from=2026-03-01&to=2026-06-10&token={API_KEY}"
response = requests.get(url)
特点:提供结构化公司新闻,免费版每分钟60次调用。支持按日期范围筛选,适合获取近90日数据。
4.1.3 Alpha Vantage
url = f"https://www.alphavantage.co/query?function=NEWS_SENTIMENT&tickers=AAPL&apikey={API_KEY}"
特点:自带情感评分,免费版每日25次调用。适合小规模研究。
4.1.4 NewsAPI
url = f"https://newsapi.org/v2/everything?q=US+stock+market&from=2026-03-01&sortBy=publishedAt&apiKey={API_KEY}"
特点:覆盖面广,免费版仅支持过去30天数据。研究用途可申请Developer权限。
4.2 财经媒体
| 媒体 | URL模式 | 数据特点 | 版权说明 |
|---|---|---|---|
| Bloomberg | bloomberg.com/markets | 权威宏观+市场数据 | 订阅制,注意版权 |
| Reuters | reuters.com/markets | 全球市场快讯 | 部分免费 |
| Seeking Alpha | seekingalpha.com | 深度个股分析 | 注意版权,仅供研究 |
| MarketWatch | marketwatch.com | 实时市场新闻 | 部分免费 |
| CNBC | Markets | 美股重点新闻 | 部分免费 |
4.3 中文媒体美股频道
| 媒体 | 频道 | 特点 |
|---|---|---|
| 华尔街见闻 | 美股频道 | 快讯+深度,实时更新 |
| 财联社 | 电报 | 实时快讯,含美股 |
| 雪球 | 美股板块 | 社区讨论+新闻聚合 |
4.4 采集规范
- 频率控制:所有API调用必须设置合理的请求间隔(建议>=1秒),遵守各平台Rate Limit
- 数据时效:新闻数据具有时效性,建议每日定时采集,存储原始数据
- 去重处理:同一新闻可能被多个来源转载,需基于标题+内容哈希去重
- 版权声明:采集的数据仅供个人学术研究使用,不得用于商业用途或公开发布
- User-Agent:HTTP请求应设置合理的User-Agent标识
五、近90日美股新闻获取实现
5.1 全市场热点新闻获取
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import hashlib
import time
import json
class USStockNewsFetcher:
"""
美股热点新闻获取器(仅供技术学习和学术研究)
获取近90个交易日的美股市场公开新闻数据
"""
def __init__(self, finnhub_api_key: str = "", alpha_vantage_key: str = ""):
self.finnhub_api_key = finnhub_api_key
self.alpha_vantage_key = alpha_vantage_key
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'AcademicResearchBot/1.0 (Research Use Only)'
})
def get_trading_days_90(self) -> tuple:
"""计算近90个交易日的日期范围(约覆盖过去4.5个月)"""
end_date = datetime.now()
start_date = end_date - timedelta(days=130) # 约130个自然日覆盖90个交易日
return start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')
def fetch_finnhub_market_news(self, start_date: str, end_date: str) -> List[Dict]:
"""
通过Finnhub获取全市场新闻
API文档:https://finnhub.io/docs/api/company-news
"""
if not self.finnhub_api_key:
print("[提示] 未设置Finnhub API Key,跳过Finnhub数据源")
return []
all_news = []
# 分段获取,每次最多30天
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
chunk_days = 30
current_start = start_dt
while current_start < end_dt:
current_end = min(current_start + timedelta(days=chunk_days), end_dt)
url = (
f"https://finnhub.io/api/v1/company-news?"
f"symbol=SPY&from={current_start.strftime('%Y-%m-%d')}"
f"&to={current_end.strftime('%Y-%m-%d')}"
f"&token={self.finnhub_api_key}"
)
try:
resp = self.session.get(url, timeout=15)
if resp.status_code == 200:
data = resp.json()
for item in data:
news_item = {
'news_id': hashlib.md5(
f"{item.get('headline', '')}{item.get('datetime', 0)}".encode()
).hexdigest(),
'title': item.get('headline', ''),
'summary': item.get('summary', ''),
'source': item.get('source', ''),
'source_url': item.get('url', ''),
'publish_time': datetime.fromtimestamp(
item.get('datetime', 0)
).isoformat(),
'related_tickers': ','.join(
item.get('related', '').split(',')[:5]
) if item.get('related') else '',
'news_type': 'market',
'language': 'en'
}
all_news.append(news_item)
else:
print(f"[警告] Finnhub请求失败: HTTP {resp.status_code}")
except Exception as e:
print(f"[错误] Finnhub请求异常: {e}")
current_start = current_end
time.sleep(1) # 遵守频率限制
print(f"[信息] Finnhub获取到 {len(all_news)} 条全市场新闻")
return all_news
def fetch_yfinance_news(self, tickers: List[str]) -> List[Dict]:
"""
通过yfinance获取个股新闻
注意:需安装 yfinance 库 (pip install yfinance)
"""
try:
import yfinance as yf
except ImportError:
print("[提示] 未安装yfinance,请执行: pip install yfinance")
return []
all_news = []
for ticker_symbol in tickers:
try:
ticker = yf.Ticker(ticker_symbol)
news_list = ticker.news
if news_list:
for item in news_list:
news_item = {
'news_id': hashlib.md5(
f"{item.get('title', '')}{item.get('publisher', '')}".encode()
).hexdigest(),
'title': item.get('title', ''),
'summary': '',
'source': item.get('publisher', ''),
'source_url': item.get('link', ''),
'publish_time': datetime.fromtimestamp(
item.get('providerPublishTime', 0)
).isoformat(),
'related_tickers': ticker_symbol,
'news_type': 'stock',
'language': 'en'
}
all_news.append(news_item)
time.sleep(0.5)
except Exception as e:
print(f"[错误] 获取 {ticker_symbol} 新闻失败: {e}")
print(f"[信息] yfinance获取到 {len(all_news)} 条个股新闻")
return all_news
def fetch_alpha_vantage_news(self, tickers: List[str]) -> List[Dict]:
"""
通过Alpha Vantage获取带情感评分的新闻
API文档:https://www.alphavantage.co/documentation/#news-sentiment
"""
if not self.alpha_vantage_key:
print("[提示] 未设置Alpha Vantage API Key,跳过")
return []
all_news = []
for ticker_symbol in tickers[:3]: # 免费版限制每日调用次数
url = (
f"https://www.alphavantage.co/query?"
f"function=NEWS_SENTIMENT&tickers={ticker_symbol}"
f"&limit=50&apikey={self.alpha_vantage_key}"
)
try:
resp = self.session.get(url, timeout=15)
if resp.status_code == 200:
data = resp.json()
for item in data.get('feed', []):
sentiment_data = next(
(s for s in item.get('ticker_sentiment', [])
if s.get('ticker') == ticker_symbol),
{}
)
news_item = {
'news_id': hashlib.md5(
f"{item.get('title', '')}{item.get('time_published', '')}".encode()
).hexdigest(),
'title': item.get('title', ''),
'summary': item.get('summary', ''),
'source': item.get('source', ''),
'source_url': item.get('url', ''),
'publish_time': item.get('time_published', ''),
'related_tickers': ticker_symbol,
'news_type': 'stock',
'sentiment_score': float(
sentiment_data.get('relevance_score', 0)
) * float(
sentiment_data.get('ticker_sentiment_score', 0)
),
'sentiment_label': sentiment_data.get('ticker_sentiment_label', 'neutral'),
'language': 'en'
}
all_news.append(news_item)
time.sleep(12) # Alpha Vantage免费版5次/分钟
except Exception as e:
print(f"[错误] Alpha Vantage请求异常: {e}")
print(f"[信息] Alpha Vantage获取到 {len(all_news)} 条新闻")
return all_news
5.2 个股新闻获取
def fetch_stock_news_multi(self, ticker_list: List[str]) -> Dict[str, List[Dict]]:
"""
批量获取多只个股的新闻
ticker_list: 股票代码列表,如 ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA']
返回: {ticker: [news_items]}
"""
results = {}
for ticker in ticker_list:
try:
import yfinance as yf
t = yf.Ticker(ticker)
news = t.news
results[ticker] = []
if news:
for item in news:
results[ticker].append({
'title': item.get('title', ''),
'publisher': item.get('publisher', ''),
'link': item.get('link', ''),
'publish_time': datetime.fromtimestamp(
item.get('providerPublishTime', 0)
).strftime('%Y-%m-%d %H:%M:%S')
})
time.sleep(0.5)
except Exception as e:
print(f"[错误] {ticker} 新闻获取失败: {e}")
results[ticker] = []
return results
5.3 中概股新闻获取
# 中概股核心列表(仅供技术学习参考)
CHINESE_CONCEPT_STOCKS = {
'BABA': {'name': 'Alibaba Group', 'sector': '电商'},
'PDD': {'name': 'PDD Holdings', 'sector': '电商'},
'JD': {'name': 'JD.com', 'sector': '电商'},
'BIDU': {'name': 'Baidu Inc.', 'sector': 'AI/搜索'},
'NIO': {'name': 'NIO Inc.', 'sector': '新能源车'},
'LI': {'name': 'Li Auto', 'sector': '新能源车'},
'XPEV': {'name': 'XPeng Inc.', 'sector': '新能源车'},
'NETEASE': {'name': 'NetEase', 'sector': '游戏'},
'EDU': {'name': 'New Oriental', 'sector': '教育'},
'IQ': {'name': 'iQIYI', 'sector': '流媒体'},
'TME': {'name': 'Tencent Music', 'sector': '音乐娱乐'},
'MNTS': {'name': 'Momentus', 'sector': '航天科技'},
}
def fetch_chinese_concept_news(self) -> Dict[str, List[Dict]]:
"""获取中概股新闻"""
tickers = list(self.CHINESE_CONCEPT_STOCKS.keys())
results = self.fetch_stock_news_multi(tickers)
# 标注中概股信息
for ticker, news_list in results.items():
info = self.CHINESE_CONCEPT_STOCKS.get(ticker, {})
for news in news_list:
news['is_chinese_concept'] = True
news['cn_sector'] = info.get('sector', '')
news['news_type'] = 'chinese'
return results
5.4 盘前盘后重大新闻
def fetch_pre_post_market_news(self) -> List[Dict]:
"""
获取盘前(4:00-9:30 ET)和盘后(16:00-20:00 ET)重大新闻
这些时段的新闻通常对次日/后续交易产生较大影响
"""
try:
import yfinance as yf
except ImportError:
return []
major_tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA',
'TSLA', 'META', 'SPY', 'QQQ']
all_news = []
for ticker in major_tickers:
try:
t = yf.Ticker(ticker)
news = t.news
if news:
for item in news:
pub_ts = item.get('providerPublishTime', 0)
pub_dt = datetime.utcfromtimestamp(pub_ts)
hour_utc = pub_dt.hour
# 美东时间 = UTC - 4(夏令时)或 UTC - 5(冬令时)
# 简化判断:UTC 8-13点 对应美东盘前;UTC 20-24点 对应美东盘后
is_pre_market = 8 <= hour_utc <= 13
is_post_market = 20 <= hour_utc <= 24
if is_pre_market or is_post_market:
all_news.append({
'title': item.get('title', ''),
'ticker': ticker,
'publish_time': pub_dt.isoformat(),
'session': 'pre_market' if is_pre_market else 'post_market',
'source': item.get('publisher', '')
})
time.sleep(0.5)
except Exception as e:
print(f"[错误] {ticker} 盘前盘后新闻获取失败: {e}")
print(f"[信息] 获取到 {len(all_news)} 条盘前盘后新闻")
return all_news
六、美股-A股映射分析(仅供学术研究)
声明:本章节内容仅供学术研究参考,不构成任何投资建议。以下映射关系基于行业关联性的学术研究框架,实际市场联动受多种因素影响,历史关联不代表未来表现。
6.1 中概股直接映射
中概股是在美股上市的中国企业,其股价同时受美股市场情绪和中国国内基本面影响,是研究跨境市场联动的天然样本。
class ChineseConceptMapper:
"""
中概股映射工具(仅供学术研究参考)
"""
# 中概股 -> A股/港股 映射表
MAPPING = {
'BABA': {
'us_name': 'Alibaba Group',
'hk_ticker': '9988.HK',
'cn_peers': ['苏宁易购', '南极电商'], # 行业可比公司(仅供参考)
'sector': '电子商务',
'note': '港股为主要上市地,美股为二次上市'
},
'PDD': {
'us_name': 'PDD Holdings',
'hk_ticker': None,
'cn_peers': ['阿里巴巴'],
'sector': '社交电商',
'note': '尚未在港股上市'
},
'NIO': {
'us_name': 'NIO Inc.',
'hk_ticker': '9866.HK',
'cn_peers': ['比亚迪', '理想汽车', '小鹏汽车'],
'sector': '新能源汽车',
'note': '新能源车板块联动性较强'
},
'BIDU': {
'us_name': 'Baidu Inc.',
'hk_ticker': '9888.HK',
'cn_peers': ['三六零', '科大讯飞'],
'sector': 'AI/搜索引擎',
'note': 'AI概念联动'
},
}
def get_mapping(self, us_ticker: str) -> Optional[Dict]:
"""获取单只中概股映射信息"""
return self.MAPPING.get(us_ticker.upper())
def get_all_mappings(self) -> Dict:
"""获取全部映射"""
return self.MAPPING
def get_sector_stocks(self, sector: str) -> List[str]:
"""按行业筛选中概股"""
return [
ticker for ticker, info in self.MAPPING.items()
if info['sector'] == sector
]
6.2 行业映射分析框架
# 美股行业 -> A股行业/概念 映射框架
SECTOR_MAPPING = {
'US Technology': {
'cn_concepts': ['人工智能', '云计算', '国产软件', '信创'],
'correlation_note': 'AI技术进展在两市场均有映射',
'research_methodology': '可使用Granger因果检验分析信息传导方向'
},
'US Semiconductors': {
'cn_concepts': ['芯片概念', '半导体设备', '国产替代'],
'correlation_note': '半导体周期全球联动,出口管制增加A股国产替代逻辑',
'research_methodology': '可分析美股半导体指数与A股芯片指数的协整关系'
},
'US Healthcare/Biotech': {
'cn_concepts': ['创新药', 'CRO/CDMO', '医疗器械'],
'correlation_note': 'FDA审批结果对全球医药板块有示范效应',
'research_methodology': '事件研究法分析FDA审批事件的市场反应'
},
'US Energy': {
'cn_concepts': ['石油石化', '新能源', '光伏', '储能'],
'correlation_note': '原油价格全球定价,新能源政策各自独立',
'research_methodology': '可分析WTI原油与A股能源板块的相关性'
},
'US Financials': {
'cn_concepts': ['银行', '券商', '保险', '金融科技'],
'correlation_note': '利率环境对两地金融股均有影响',
'research_methodology': '利率敏感性分析'
},
}
6.3 情绪传导研究框架
class SentimentTransmissionAnalyzer:
"""
美股->A股情绪传导分析框架(仅供学术研究)
研究方法:基于新闻情感的时间序列分析
"""
def __init__(self):
self.us_daily_sentiment = {} # date -> avg_sentiment
self.cn_daily_sentiment = {} # date -> avg_sentiment
def calculate_daily_sentiment(self, news_list: List[Dict]) -> Dict[str, float]:
"""计算每日平均情感得分"""
daily_scores = {}
for news in news_list:
date = news['publish_time'][:10]
score = news.get('sentiment_score', 0)
if date not in daily_scores:
daily_scores[date] = []
daily_scores[date].append(score)
return {
date: sum(scores) / len(scores)
for date, scores in daily_scores.items()
if scores
}
def analyze_transmission(self, lag_days: int = 1) -> Dict:
"""
分析情绪传导效应
lag_days: 美股情绪领先A股的天数
返回: 统计描述(仅供学术参考)
"""
# 框架示意:实际研究需使用正式统计方法
# 如:向量自回归(VAR)、Granger因果检验、CCF(交叉相关函数)
framework = {
'method': 'Granger Causality Test / Cross-Correlation Function',
'null_hypothesis': '美股新闻情绪不Granger引起A股情绪变化',
'data_requirement': '至少2年的日度情感数据',
'control_variables': ['汇率变动', '北向资金流向', '宏观事件'],
'note': '仅供学术研究方法论参考,不构成投资建议'
}
return framework
6.4 时间窗口分析
| 时间窗口 | 说明 | 研究应用 |
|---|---|---|
| T+0(当日同步) | 美股盘中的新闻对A股当日影响有限(时差) | 分析信息效率 |
| T+1(次日开盘) | 美股隔夜新闻对A股开盘价的影响 | 跳空缺口研究 |
| T+1~T+5(短期) | 美股趋势对A股短期走势的影响 | 动量传导研究 |
| T+5~T+22(中期) | 行业基本面变化的传导 | 中期联动研究 |
七、NLP处理
7.1 英文新闻情感分析
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
class FinancialSentimentAnalyzer:
"""
金融新闻情感分析器
支持两种模型:VADER(基于规则)和 FinBERT(基于深度学习)
"""
def __init__(self, use_finbert: bool = False):
self.use_finbert = use_finbert
if use_finbert:
# FinBERT:专为金融文本预训练的BERT模型
# 论文:https://arxiv.org/abs/1906.08022
# 模型:ProsusAI/finbert
try:
self.tokenizer = AutoTokenizer.from_pretrained(
'ProsusAI/finbert'
)
self.model = AutoModelForSequenceClassification.from_pretrained(
'ProsusAI/finbert'
)
self.model.eval()
self.labels = ['positive', 'negative', 'neutral']
except Exception as e:
print(f"[警告] FinBERT加载失败: {e},回退到VADER")
self.use_finbert = False
if not self.use_finbert:
self.vader = SentimentIntensityAnalyzer()
def analyze_vader(self, text: str) -> Dict:
"""VADER情感分析(基于规则,速度快)"""
scores = self.vader.polarity_scores(text)
return {
'sentiment_score': scores['compound'], # -1到1
'sentiment_label': (
'positive' if scores['compound'] >= 0.05
else 'negative' if scores['compound'] <= -0.05
else 'neutral'
),
'positive_ratio': scores['pos'],
'negative_ratio': scores['neg'],
'neutral_ratio': scores['neu'],
'method': 'VADER'
}
def analyze_finbert(self, text: str) -> Dict:
"""FinBERT情感分析(深度学习,精度高)"""
inputs = self.tokenizer(
text[:512], # BERT最大token长度
return_tensors='pt',
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.softmax(outputs.logits, dim=1)[0]
best_idx = torch.argmax(probs).item()
# FinBERT标签顺序:positive=0, negative=1, neutral=2
label_map = {0: 'positive', 1: 'negative', 2: 'neutral'}
return {
'sentiment_score': float(probs[best_idx]),
'sentiment_label': label_map[best_idx],
'positive_prob': float(probs[0]),
'negative_prob': float(probs[1]),
'neutral_prob': float(probs[2]),
'method': 'FinBERT'
}
def analyze(self, text: str) -> Dict:
"""统一分析接口"""
if not text or len(text.strip()) < 10:
return {
'sentiment_score': 0.0,
'sentiment_label': 'neutral',
'method': 'default'
}
if self.use_finbert:
return self.analyze_finbert(text)
else:
return self.analyze_vader(text)
def batch_analyze(self, texts: List[str]) -> List[Dict]:
"""批量分析"""
return [self.analyze(text) for text in texts]
7.2 关键信息提取
import re
from collections import Counter
class NewsInfoExtractor:
"""新闻关键信息提取器"""
# 美股常见财经关键词模式
PATTERNS = {
'earnings': re.compile(
r'(earnings|EPS|revenue|profit|loss|beat|miss|guidance)',
re.IGNORECASE
),
'fed': re.compile(
r'(Fed|Federal Reserve|FOMC|interest rate|rate hike|rate cut|Powell)',
re.IGNORECASE
),
'ma': re.compile(
r'(acquire|merger|buyout|takeover|deal|acquisition)',
re.IGNORECASE
),
'upgrade': re.compile(
r'(upgrade|downgrade|initiate|overweight|underweight|outperform)',
re.IGNORECASE
),
'price_target': re.compile(
r'(price target|PT|\$\d+)', re.IGNORECASE
),
}
def extract_keywords(self, text: str, top_n: int = 10) -> List[str]:
"""提取关键词(基于词频+金融领域过滤)"""
# 简单的词频统计(实际项目中建议使用TF-IDF或YAKE)
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# 过滤停用词
stop_words = {
'the', 'and', 'for', 'that', 'this', 'with', 'from', 'have',
'will', 'been', 'would', 'could', 'their', 'there', 'about',
'which', 'when', 'what', 'your', 'they', 'said', 'also'
}
filtered = [w for w in words if w not in stop_words and len(w) > 3]
return [word for word, _ in Counter(filtered).most_common(top_n)]
def extract_ticker_mentions(self, text: str) -> List[str]:
"""从文本中提取股票代码"""
# 匹配大写字母+数字的股票代码模式
pattern = re.compile(r'\b([A-Z]{2,5}\b)')
matches = pattern.findall(text)
# 过滤常见非股票代码的大写词
non_tickers = {'THE', 'AND', 'FOR', 'NOT', 'BUT', 'WITH', 'THIS',
'THAT', 'FROM', 'HAVE', 'WILL', 'BEEN', 'OVER'}
return list(set(m for m in matches if m not in non_tickers))
def classify_news_category(self, title: str, summary: str = '') -> str:
"""分类新闻类型"""
text = f"{title} {summary}".lower()
if any(p.search(text) for p in [
self.PATTERNS['fed'],
re.compile(r'(gdp|inflation|cpi|employment|unemployment)', re.I)
]):
return 'macro'
if any(p.search(text) for p in [
self.PATTERNS['earnings'],
self.PATTERNS['ma'],
self.PATTERNS['upgrade']
]):
return 'stock'
sector_keywords = {
'tech': ['ai', 'artificial intelligence', 'cloud', 'software', 'chip'],
'healthcare': ['fda', 'drug', 'trial', 'pharma', 'biotech'],
'energy': ['oil', 'gas', 'solar', 'renewable', 'opec'],
'finance': ['bank', 'loan', 'interest', 'mortgage', 'credit']
}
for sector, keywords in sector_keywords.items():
if any(kw in text for kw in keywords):
return f'sector_{sector}'
return 'general'
7.3 新闻去重与聚类
from difflib import SequenceMatcher
class NewsDeduplicator:
"""新闻去重与聚类"""
def __init__(self, similarity_threshold: float = 0.85):
self.threshold = similarity_threshold
self.seen_hashes = set()
def compute_similarity(self, text1: str, text2: str) -> float:
"""计算两段文本的相似度(基于序列匹配)"""
return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()
def generate_hash(self, title: str) -> str:
"""生成新闻标题哈希用于快速去重"""
return hashlib.md5(title.encode()).hexdigest()
def is_duplicate(self, title: str) -> bool:
"""快速哈希去重"""
h = self.generate_hash(title)
if h in self.seen_hashes:
return True
self.seen_hashes.add(h)
return False
def deduplicate_list(self, news_list: List[Dict]) -> List[Dict]:
"""
对新闻列表进行去重
第一层:标题哈希精确去重
第二层:相似度模糊去重(处理不同来源的同一新闻)
"""
unique_news = []
seen_titles = []
for news in news_list:
title = news.get('title', '')
# 第一层:精确去重
if self.is_duplicate(title):
news['is_duplicate'] = 1
continue
# 第二层:模糊去重
is_similar = False
for seen_title in seen_titles:
if self.compute_similarity(title, seen_title) > self.threshold:
is_similar = True
news['is_duplicate'] = 1
break
if not is_similar:
unique_news.append(news)
seen_titles.append(title)
print(f"[信息] 去重: {len(news_list)} -> {len(unique_news)} 条")
return unique_news
八、学术研究应用示例
声明:以下内容仅供学术研究方法论参考,不构成任何投资建议或策略推荐。
8.1 跨境市场联动研究框架
class CrossMarketResearchFramework:
"""
美股-A股跨境市场联动研究框架(仅供学术研究)
"""
def research_design(self) -> Dict:
"""
研究设计框架
研究问题:美股新闻情感是否对A股市场产生显著的信息传导效应?
"""
return {
'research_question': (
'美股市场新闻情感对A股市场是否存在显著的信息传导效应?'
),
'hypotheses': {
'H1': '美股负面新闻情感与次日A股开盘下跌显著相关',
'H2': '中概股新闻对A股相关板块的影响大于非中概股新闻',
'H3': '宏观新闻的信息传导效率高于个股新闻'
},
'data_requirements': {
'us_news': '近90日美股新闻 + 情感分析得分',
'cn_market': 'A股指数/个股日度数据(开盘/收盘/成交量)',
'control_variables': '汇率(USDCNY)、北向资金、VIX指数'
},
'methods': [
'事件研究法(Event Study)',
'向量自回归模型(VAR)',
'Granger因果检验',
'面板数据回归(Panel Regression)',
'DID(双重差分法)用于政策事件'
],
'note': '以上仅为研究方法论框架,不构成投资建议'
}
def event_study_template(self) -> Dict:
"""事件研究法模板"""
return {
'event_definition': '美股重大新闻发布(|sentiment| > 0.7)',
'event_window': 'T-5 到 T+5(共11个交易日)',
'estimation_window': 'T-120 到 T-6(用于估计正常收益率)',
'model': '市场模型法(Market Model)',
'ar_calculation': 'AR_t = R_t - (alpha + beta * R_m_t)',
'car_calculation': 'CAR = sum(AR_t) over event window',
'statistical_test': 't-test for CAR != 0',
'references': [
'Fama, Fisher, Jensen, Roll (1969)',
'Brown & Warner (1985)'
]
}
8.2 信息传导效率研究方法论
def information_transmission_analysis(self) -> Dict:
"""信息传导效率研究方法论"""
return {
'research_topic': '美股新闻信息向A股市场的传导效率',
'key_variables': {
'dependent': 'A股相关标的的超额收益率(AR)',
'independent': '美股新闻情感得分、新闻类型虚拟变量',
'mediating': '北向资金流向、机构持仓变动',
'moderating': '市场波动率(VIX)、中美利差'
},
'analysis_steps': [
'1. 数据收集:美股新闻 + A股行情 + 控制变量',
'2. 情感标注:对美股新闻进行NLP情感分析',
'3. 事件对齐:按交易日对齐美股新闻与A股反应',
'4. 模型估计:构建回归模型检验传导效应',
'5. 稳健性检验:更换窗口期、子样本分析'
],
'disclaimer': (
'以上研究框架仅供学术方法论参考。'
'任何实际投资决策需独立判断,本文不承担任何责任。'
)
}
九、完整Python代码实现
9.1 美股新闻获取类(整合版)
"""
美股热点新闻数据模块 - 完整实现
仅供技术学习和学术研究使用,不构成任何投资建议。
"""
import requests
import hashlib
import time
import json
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class USStockNewsModule:
"""
美股热点新闻数据模块 - 整合版
功能:数据采集、存储、情感分析、中概股映射、可视化
"""
# 核心美股标的列表(用于新闻采集,按板块分组)
TICKER_UNIVERSE = {
'mega_cap': ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'META', 'TSLA'],
'semiconductor': ['NVDA', 'AMD', 'INTC', 'TSM', 'ASML', 'AVGO', 'QCOM'],
'finance': ['JPM', 'BAC', 'GS', 'MS', 'V', 'MA'],
'healthcare': ['JNJ', 'UNH', 'PFE', 'ABBV', 'MRK', 'LLY'],
'energy': ['XOM', 'CVX', 'COP', 'SLB', 'EOG'],
'chinese_concept': ['BABA', 'PDD', 'JD', 'BIDU', 'NIO', 'LI', 'XPEV'],
'indices': ['SPY', 'QQQ', 'IWM', 'DIA']
}
def __init__(self, db_path: str = 'us_news.db',
finnhub_key: str = '', alpha_vantage_key: str = ''):
self.db_path = db_path
self.finnhub_key = finnhub_key
self.alpha_vantage_key = alpha_vantage_key
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'AcademicResearch/1.0 (Research Only)'
})
self._init_db()
def _init_db(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS us_stock_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
news_id TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
summary TEXT,
source TEXT NOT NULL,
source_url TEXT,
publish_time TIMESTAMP NOT NULL,
collect_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
related_tickers TEXT,
news_type TEXT,
sector TEXT,
sentiment_score REAL,
sentiment_label TEXT,
keywords TEXT,
importance INTEGER DEFAULT 0,
language TEXT DEFAULT 'en',
is_duplicate INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS chinese_concept_mapping (
id INTEGER PRIMARY KEY AUTOINCREMENT,
us_ticker TEXT UNIQUE NOT NULL,
us_name TEXT,
hk_ticker TEXT,
hk_name TEXT,
cn_ticker TEXT,
cn_name TEXT,
sector TEXT,
note TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
# 创建索引
c.execute('CREATE INDEX IF NOT EXISTS idx_news_time ON us_stock_news(publish_time)')
c.execute('CREATE INDEX IF NOT EXISTS idx_news_type ON us_stock_news(news_type)')
c.execute('CREATE INDEX IF NOT EXISTS idx_news_sentiment ON us_stock_news(sentiment_score)')
conn.commit()
conn.close()
def save_news(self, news_list: List[Dict]):
"""批量保存新闻到数据库"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
saved_count = 0
for news in news_list:
try:
c.execute('''INSERT OR IGNORE INTO us_stock_news
(news_id, title, summary, source, source_url, publish_time,
related_tickers, news_type, sector, sentiment_score,
sentiment_label, keywords, language)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(
news.get('news_id', ''),
news.get('title', ''),
news.get('summary', ''),
news.get('source', ''),
news.get('source_url', ''),
news.get('publish_time', ''),
news.get('related_tickers', ''),
news.get('news_type', ''),
news.get('sector', ''),
news.get('sentiment_score'),
news.get('sentiment_label'),
json.dumps(news.get('keywords', []), ensure_ascii=False),
news.get('language', 'en')
)
)
if c.rowcount > 0:
saved_count += 1
except Exception as e:
print(f"[错误] 保存新闻失败: {e}")
conn.commit()
conn.close()
print(f"[信息] 保存 {saved_count}/{len(news_list)} 条新闻到数据库")
def fetch_all_news_90d(self) -> List[Dict]:
"""获取近90个交易日的全部美股新闻"""
all_news = []
# 1. 通过yfinance获取主要个股新闻
try:
import yfinance as yf
all_tickers = []
for group_tickers in self.TICKER_UNIVERSE.values():
all_tickers.extend(group_tickers)
all_tickers = list(set(all_tickers))
for ticker in all_tickers:
try:
t = yf.Ticker(ticker)
news = t.news
if news:
for item in news:
news_id = hashlib.md5(
f"{item.get('title', '')}{item.get('publisher', '')}".encode()
).hexdigest()
all_news.append({
'news_id': news_id,
'title': item.get('title', ''),
'summary': '',
'source': item.get('publisher', ''),
'source_url': item.get('link', ''),
'publish_time': datetime.fromtimestamp(
item.get('providerPublishTime', 0)
).isoformat(),
'related_tickers': ticker,
'news_type': 'stock',
'language': 'en'
})
time.sleep(0.3)
except Exception as e:
print(f"[错误] {ticker}: {e}")
except ImportError:
print("[提示] 未安装yfinance: pip install yfinance")
# 2. 通过Finnhub获取市场新闻
if self.finnhub_key:
start_date, end_date = self.get_trading_days_90()
finnhub_news = self.fetch_finnhub_market_news(start_date, end_date)
all_news.extend(finnhub_news)
# 3. 去重
deduplicator = NewsDeduplicator()
all_news = deduplicator.deduplicate_list(all_news)
# 4. 保存
self.save_news(all_news)
return all_news
def get_trading_days_90(self) -> tuple:
"""计算近90个交易日的日期范围"""
end_date = datetime.now()
start_date = end_date - timedelta(days=130)
return start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')
def fetch_finnhub_market_news(self, start_date: str, end_date: str) -> List[Dict]:
"""通过Finnhub获取市场新闻"""
if not self.finnhub_key:
return []
all_news = []
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
chunk_days = 30
current_start = start_dt
while current_start < end_dt:
current_end = min(current_start + timedelta(days=chunk_days), end_dt)
url = (
f"https://finnhub.io/api/v1/company-news?"
f"symbol=SPY&from={current_start.strftime('%Y-%m-%d')}"
f"&to={current_end.strftime('%Y-%m-%d')}"
f"&token={self.finnhub_key}"
)
try:
resp = self.session.get(url, timeout=15)
if resp.status_code == 200:
for item in resp.json():
all_news.append({
'news_id': hashlib.md5(
f"{item.get('headline', '')}{item.get('datetime', 0)}".encode()
).hexdigest(),
'title': item.get('headline', ''),
'summary': item.get('summary', ''),
'source': item.get('source', ''),
'source_url': item.get('url', ''),
'publish_time': datetime.fromtimestamp(
item.get('datetime', 0)
).isoformat(),
'related_tickers': ','.join(
item.get('related', '').split(',')[:5]
) if item.get('related') else '',
'news_type': 'market',
'language': 'en'
})
except Exception as e:
print(f"[错误] Finnhub: {e}")
current_start = current_end
time.sleep(1)
return all_news
def query_news(self, days: int = 7, news_type: str = None,
ticker: str = None, min_sentiment: float = None) -> List[Dict]:
"""查询数据库中的新闻"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
query = "SELECT * FROM us_stock_news WHERE 1=1"
params = []
if days > 0:
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
query += " AND publish_time >= ?"
params.append(cutoff)
if news_type:
query += " AND news_type = ?"
params.append(news_type)
if ticker:
query += " AND related_tickers LIKE ?"
params.append(f'%{ticker}%')
if min_sentiment is not None:
query += " AND sentiment_score >= ?"
params.append(min_sentiment)
query += " ORDER BY publish_time DESC LIMIT 500"
c.execute(query, params)
columns = [desc[0] for desc in c.description]
results = [dict(zip(columns, row)) for row in c.fetchall()]
conn.close()
return results
def get_stats(self) -> Dict:
"""获取数据统计"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
stats = {}
c.execute("SELECT COUNT(*) FROM us_stock_news")
stats['total_news'] = c.fetchone()[0]
c.execute("SELECT news_type, COUNT(*) FROM us_stock_news GROUP BY news_type")
stats['by_type'] = dict(c.fetchall())
c.execute("SELECT MIN(publish_time), MAX(publish_time) FROM us_stock_news")
row = c.fetchone()
stats['date_range'] = {'earliest': row[0], 'latest': row[1]}
c.execute("SELECT AVG(sentiment_score) FROM us_stock_news WHERE sentiment_score IS NOT NULL")
stats['avg_sentiment'] = round(c.fetchone()[0], 4) if c.fetchone()[0] else 0
conn.close()
return stats
9.2 中概股映射类
class ChineseConceptMapper:
"""中概股映射类(仅供学术研究参考)"""
MAPPING = {
'BABA': {'name': 'Alibaba', 'hk': '9988.HK', 'sector': '电商'},
'PDD': {'name': 'PDD Holdings', 'hk': None, 'sector': '社交电商'},
'JD': {'name': 'JD.com', 'hk': '9618.HK', 'sector': '电商'},
'BIDU': {'name': 'Baidu', 'hk': '9888.HK', 'sector': 'AI/搜索'},
'NIO': {'name': 'NIO', 'hk': '9866.HK', 'sector': '新能源车'},
'LI': {'name': 'Li Auto', 'hk': '2015.HK', 'sector': '新能源车'},
'XPEV': {'name': 'XPeng', 'hk': '9868.HK', 'sector': '新能源车'},
'NETEASE': {'name': 'NetEase', 'hk': '9999.HK', 'sector': '游戏'},
'EDU': {'name': 'New Oriental', 'hk': '9901.HK', 'sector': '教育'},
}
SECTOR_MAPPING = {
'电商': {'cn_concepts': ['直播电商', '跨境电商', '新零售']},
'新能源车': {'cn_concepts': ['锂电池', '智能驾驶', '充电桩']},
'AI/搜索': {'cn_concepts': ['大模型', '自动驾驶', '云计算']},
'游戏': {'cn_concepts': ['游戏出海', '元宇宙', 'AI+游戏']},
'教育': {'cn_concepts': ['职业教育', '教育信息化']},
}
def get_cn_sector_impact(self, us_ticker: str) -> Optional[Dict]:
"""
获取中概股对A股行业的影响映射(仅供学术研究)
"""
info = self.MAPPING.get(us_ticker.upper())
if not info:
return None
sector = info['sector']
cn_concepts = self.SECTOR_MAPPING.get(sector, {}).get('cn_concepts', [])
return {
'us_ticker': us_ticker,
'company': info['name'],
'sector': sector,
'hk_ticker': info['hk'],
'related_cn_concepts': cn_concepts,
'disclaimer': '以上映射仅供学术研究参考,不构成投资建议'
}
9.3 情感分析类
class SentimentAnalyzer:
"""情感分析类(VADER简化版,无需额外依赖)"""
def __init__(self):
# 金融领域情感词典(简化版)
self.positive_words = {
'surge', 'jump', 'rally', 'beat', 'exceed', 'growth', 'profit',
'upgrade', 'outperform', 'bullish', 'strong', 'record', 'gain',
'innovate', 'breakthrough', 'dividend', 'buyback', 'acquire'
}
self.negative_words = {
'drop', 'fall', 'crash', 'miss', 'decline', 'loss', 'cut',
'downgrade', 'underperform', 'bearish', 'weak', 'slump',
'warning', 'investigation', 'recall', 'lawsuit', 'debt'
}
self.intensifiers = {'very', 'extremely', 'highly', 'significantly'}
self.negators = {'not', 'no', 'never', 'despite', 'although'}
def analyze(self, text: str) -> Dict:
"""基于词典的情感分析"""
words = text.lower().split()
score = 0.0
count = 0
for i, word in enumerate(words):
word_clean = word.strip('.,!?;:')
if word_clean in self.positive_words:
multiplier = 1.5 if any(
words[j] in self.intensifiers for j in range(max(0, i-2), i)
) else 1.0
if any(words[j] in self.negators for j in range(max(0, i-2), i)):
multiplier *= -0.5
score += multiplier
count += 1
elif word_clean in self.negative_words:
multiplier = 1.5 if any(
words[j] in self.intensifiers for j in range(max(0, i-2), i)
) else 1.0
if any(words[j] in self.negators for j in range(max(0, i-2), i)):
multiplier *= -0.5
score -= multiplier
count += 1
if count > 0:
normalized = max(-1.0, min(1.0, score / count))
else:
normalized = 0.0
return {
'sentiment_score': round(normalized, 4),
'sentiment_label': (
'positive' if normalized >= 0.1
else 'negative' if normalized <= -0.1
else 'neutral'
),
'matched_terms': count,
'method': 'dictionary_based'
}
9.4 数据存储类
class NewsStorageManager:
"""数据存储管理类"""
def __init__(self, db_path: str = 'us_news.db'):
self.db_path = db_path
def export_to_json(self, output_path: str, days: int = 90):
"""导出新闻为JSON文件"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
c = conn.cursor()
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c.execute(
"SELECT * FROM us_stock_news WHERE publish_time >= ? ORDER BY publish_time DESC",
(cutoff,)
)
news_list = [dict(row) for row in c.fetchall()]
conn.close()
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(news_list, f, ensure_ascii=False, indent=2, default=str)
print(f"[信息] 导出 {len(news_list)} 条新闻到 {output_path}")
def export_to_csv(self, output_path: str, days: int = 90):
"""导出新闻为CSV文件"""
import csv
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
c = conn.cursor()
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c.execute(
"SELECT * FROM us_stock_news WHERE publish_time >= ? ORDER BY publish_time DESC",
(cutoff,)
)
columns = [desc[0] for desc in c.description]
rows = c.fetchall()
conn.close()
with open(output_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
for row in rows:
writer.writerow(dict(row))
print(f"[信息] 导出 {len(rows)} 条新闻到 {output_path}")
def get_daily_summary(self, date: str) -> Dict:
"""获取指定日期的新闻摘要统计"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute(
"""SELECT news_type, COUNT(*) as count,
AVG(sentiment_score) as avg_sentiment
FROM us_stock_news
WHERE publish_time LIKE ?
GROUP BY news_type""",
(f"{date}%",)
)
summary = {}
for row in c.fetchall():
summary[row[0]] = {
'count': row[1],
'avg_sentiment': round(row[2], 4) if row[2] else 0
}
conn.close()
return summary
9.5 可视化类
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # 无GUI后端
# 设置中文字体(如系统无中文字体会回退到英文)
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
class NewsVisualizer:
"""新闻数据可视化类"""
def plot_sentiment_timeline(self, news_list: List[Dict], output_path: str):
"""
绘制新闻情感时间线图
"""
if not news_list:
print("[警告] 无数据可绘制")
return
# 按日期聚合情感得分
daily_sentiment = {}
daily_count = {}
for news in news_list:
date = news.get('publish_time', '')[:10]
score = news.get('sentiment_score', 0)
if date and score is not None:
daily_sentiment[date] = daily_sentiment.get(date, 0) + score
daily_count[date] = daily_count.get(date, 0) + 1
dates = sorted(daily_sentiment.keys())
avg_scores = [
daily_sentiment[d] / daily_count[d] for d in dates
]
fig, ax = plt.subplots(figsize=(14, 6))
colors = ['#2ecc71' if s >= 0 else '#e74c3c' for s in avg_scores]
ax.bar(range(len(dates)), avg_scores, color=colors, alpha=0.7)
ax.set_xticks(range(0, len(dates), max(1, len(dates) // 10)))
ax.set_xticklabels([dates[i] for i in range(0, len(dates), max(1, len(dates) // 10))],
rotation=45, ha='right')
ax.axhline(y=0, color='gray', linestyle='--', linewidth=0.5)
ax.set_title('US Stock News Sentiment Timeline (Academic Research Only)',
fontsize=14)
ax.set_xlabel('Date')
ax.set_ylabel('Average Sentiment Score')
ax.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[信息] 情感时间线图已保存: {output_path}")
def plot_news_type_distribution(self, news_list: List[Dict], output_path: str):
"""绘制新闻类型分布饼图"""
from collections import Counter
type_counts = Counter(n.get('news_type', 'unknown') for n in news_list)
fig, ax = plt.subplots(figsize=(8, 8))
labels = list(type_counts.keys())
sizes = list(type_counts.values())
colors = ['#3498db', '#e74c3c', '#2ecc71', '#f39c12', '#9b59b6']
ax.pie(sizes, labels=labels, colors=colors[:len(labels)],
autopct='%1.1f%%', startangle=90)
ax.set_title('News Type Distribution')
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[信息] 类型分布图已保存: {output_path}")
def plot_ticker_news_heatmap(self, news_list: List[Dict], output_path: str):
"""绘制个股新闻数量热力图"""
import numpy as np
from collections import Counter, defaultdict
# 按日期和股票聚合
ticker_date_count = defaultdict(lambda: defaultdict(int))
for news in news_list:
date = news.get('publish_time', '')[:10]
tickers = news.get('related_tickers', '').split(',')
for t in tickers:
t = t.strip()
if t:
ticker_date_count[t][date] += 1
# 取新闻最多的前15只股票
ticker_totals = {
t: sum(counts.values()) for t, counts in ticker_date_count.items()
}
top_tickers = sorted(ticker_totals, key=ticker_totals.get, reverse=True)[:15]
if not top_tickers:
print("[警告] 无数据可绘制热力图")
return
dates = sorted(set(
d for counts in ticker_date_count.values() for d in counts
))
data = np.zeros((len(top_tickers), len(dates)))
for i, ticker in enumerate(top_tickers):
for j, date in enumerate(dates):
data[i][j] = ticker_date_count[ticker].get(date, 0)
fig, ax = plt.subplots(figsize=(16, 8))
im = ax.imshow(data, aspect='auto', cmap='YlOrRd')
ax.set_xticks(range(0, len(dates), max(1, len(dates) // 10)))
ax.set_xticklabels(
[dates[i] for i in range(0, len(dates), max(1, len(dates) // 10))],
rotation=45, ha='right'
)
ax.set_yticks(range(len(top_tickers)))
ax.set_yticklabels(top_tickers)
ax.set_title('Ticker-Date News Count Heatmap')
plt.colorbar(im, ax=ax, label='News Count')
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[信息] 热力图已保存: {output_path}")
9.6 主程序入口
def main():
"""
主程序入口 - 美股新闻数据模块演示
仅供技术学习和学术研究使用
"""
print("=" * 60)
print("美股热点新闻数据模块(技术学习版)")
print("声明:本程序仅供学术研究,不构成任何投资建议")
print("=" * 60)
# 初始化模块
module = USStockNewsModule(
db_path='us_stock_news.db',
finnhub_key='YOUR_FINNHUB_KEY', # 替换为实际Key
alpha_vantage_key='YOUR_AV_KEY' # 替换为实际Key
)
# 获取近90日新闻
print("\n[步骤1] 获取近90个交易日美股新闻...")
news_list = module.fetch_all_news_90d()
print(f"共获取 {len(news_list)} 条新闻")
# 情感分析
print("\n[步骤2] 执行情感分析...")
analyzer = SentimentAnalyzer()
for news in news_list:
result = analyzer.analyze(news.get('title', ''))
news['sentiment_score'] = result['sentiment_score']
news['sentiment_label'] = result['sentiment_label']
# 更新数据库中的情感得分
module.save_news(news_list)
# 数据统计
print("\n[步骤3] 数据统计...")
stats = module.get_stats()
print(f"总新闻数: {stats['total_news']}")
print(f"类型分布: {stats['by_type']}")
# 中概股映射
print("\n[步骤4] 中概股映射分析(仅供学术研究)...")
mapper = ChineseConceptMapper()
for ticker in ['BABA', 'NIO', 'BIDU']:
impact = mapper.get_cn_sector_impact(ticker)
if impact:
print(f" {ticker} ({impact['company']}): "
f"行业={impact['sector']}, "
f"港股={impact['hk_ticker']}, "
f"A股概念={impact['related_cn_concepts']}")
# 导出数据
print("\n[步骤5] 导出数据...")
storage = NewsStorageManager('us_stock_news.db')
storage.export_to_json('us_news_90d.json')
storage.export_to_csv('us_news_90d.csv')
# 可视化
print("\n[步骤6] 生成可视化图表...")
visualizer = NewsVisualizer()
visualizer.plot_sentiment_timeline(news_list, 'sentiment_timeline.png')
visualizer.plot_news_type_distribution(news_list, 'news_type_dist.png')
visualizer.plot_ticker_news_heatmap(news_list, 'ticker_news_heatmap.png')
print("\n" + "=" * 60)
print("处理完成!所有数据仅供学术研究参考。")
print("免责声明:不构成任何投资建议,投资需谨慎。")
print("=" * 60)
if __name__ == '__main__':
main()
十、合规与法律注意事项
10.1 美股数据版权
- Yahoo Finance数据:通过yfinance获取的数据受Yahoo Terms of Service约束,仅供个人非商业研究使用
- Finnhub数据:免费版API数据可用于非商业用途,商业使用需购买授权
- Alpha Vantage数据:免费版有调用频率限制,需遵守其使用条款
- 财经媒体内容:Bloomberg、Reuters、Seeking Alpha等媒体的原创内容受版权保护,爬取需获得授权
10.2 SEC合规要求
- 美国证券交易委员会(SEC)对证券市场数据使用有严格规定
- 不得利用非公开信息进行交易(内幕交易)
- 不得散布虚假或误导性市场信息(市场操纵)
- 量化交易策略需符合SEC和FINRA相关法规
10.3 跨境数据传输合规
- 中国《数据安全法》《个人信息保护法》对数据出境有明确规定
- 金融数据可能涉及国家经济安全,需评估数据分类分级
- 学术研究中的数据使用需遵守所在机构的伦理审查要求
- 跨境传输美股数据用于研究用途,建议咨询法律专业人士
10.4 不得用于非法证券活动
- 严禁将本模块用于任何形式的实际证券交易决策
- 严禁基于本模块数据向他人提供投资建议
- 严禁将分析结果用于荐股、喊单等非法证券活动
- 严禁将数据用于市场操纵、内幕交易等违法行为
- 违反上述规定可能面临刑事和民事法律责任
十一、免责声明与风险提示
重要免责声明:
-
非投资建议:本文档及所有代码、数据、分析结果仅供技术学习和学术研究使用,不构成任何形式的投资建议、投资咨询或证券交易指导。
-
投资风险自担:股市有风险,投资需谨慎。任何基于本文档内容进行的投资决策,均由投资者自行承担全部风险和责任。
-
数据准确性:本文档中引用的数据来源于公开渠道,作者不对数据的准确性、完整性、及时性作出任何保证。
-
合规责任:使用者有责任确保其使用行为符合所在司法管辖区的法律法规,包括但不限于美国SEC法规、中国证监会法规等。
-
版权声明:各数据源的原始数据版权归原数据提供方所有,本文档仅做技术整合和学术研究用途。
-
美股监管:美股市场受美国SEC监管,跨境投资需遵守相关法律法规。中国投资者参与美股交易需遵守国家外汇管理局和证监会相关规定。
-
技术风险:代码可能存在Bug或安全漏洞,使用者应在充分理解代码逻辑的前提下使用,作者不对代码运行结果承担任何责任。
-
学术引用:如将本文档内容用于学术论文或研究报告,请注明"仅供学术研究参考,不构成投资建议"。
文档版本:v3.0 | 最后更新:2026年6月 | 用途:技术学习与学术研究
最后再次提醒:本文档所有内容仅供技术学习和学术研究参考。投资有风险,入市需谨慎。请务必在充分了解风险的基础上,做出独立的投资决策。