系统环境
系统 macOS 26.0.1
本次学习使用的项目
环境搭建
虚拟环境搭建
cd pyworkspace/news_server
uv venv
uv init
创建一个虚拟环境以后,手动安装fastmcp,原项目中使用 mcp-server-fastmcp,依赖找不到,可能是变更了。目前使用如下方式可以解决
uv add fastmcp
# 清理 requirements.txt 中的问题包
sed -i '/mcp-server-fastmcp/d' requirements.txt # Linux/Mac
# 或手动编辑文件 ,删除requirements.txt 中的mcp-server-fastmcp并保存文件
# 安装依赖
uv pip install -r requirements.txt
# 查看依赖结果
uv pip list
本地启动
点开 Server.py文件,替换 本地启动测试;更推荐作为服务使用 参考该文档不同传输模式对比
mcp.run(transport="streamable-http")

docekr 部署
当前项目需要修改的点如下,修改的目的,docker 中部署指定 host 和 port
server.py 中
1.
mcp = FastMCP("NewsServer", host="0.0.0.0", port=8000)
2.
# 使用stdio传输
# Bind to 0.0.0.0 so the service is reachable from outside the container
# and ensure it listens on port 8000 (the container's exposed port).
# FastMCP.run forwards args to the underlying ASGI server (uvicorn),
# so passing host/port here makes the containerized service accessible.
try:
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
except TypeError:
# If the FastMCP.run implementation doesn't accept host/port keyword args,
# fall back to the default call so the module still runs.
mcp.run(transport="streamable-http")
import asyncio
import httpx
import feedparser
from datetime import datetime
from typing import List, Dict
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("NewsServer", host="0.0.0.0", port=8000)
class NewsFetcher:
def __init__(self):
self.client = None
self.news_sources = {
"tech": [
"https://www.ithome.com/rss/",
],
"general": [
"http://www.people.com.cn/rss/politics.xml"
],
"international": [
"http://rss.cnn.com/rss/edition.rss",
"https://feeds.bbci.co.uk/news/world/rss.xml"
]
}
async def initialize(self):
"""初始化HTTP客户端"""
if self.client is None:
self.client = httpx.AsyncClient(
timeout=30.0,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
return True
async def fetch_hot_news(self, category: str = "general", limit: int = 20) -> List[Dict]:
"""获取热点新闻"""
await self.initialize()
news_list = []
sources = self.news_sources.get(category, self.news_sources["general"])
# 并发获取所有RSS源
tasks = [self._fetch_rss_feed(source) for source in sources[:3]]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
continue
news_list.extend(result)
# 去重和排序
news_list = self._deduplicate_news(news_list)
news_list.sort(key=lambda x: x.get('time', ''), reverse=True)
return news_list[:limit]
async def _fetch_rss_feed(self, rss_url: str) -> List[Dict]:
"""获取单个RSS源的内容"""
try:
response = await self.client.get(rss_url)
feed = feedparser.parse(response.content)
news_items = []
for entry in feed.entries[:10]: # 每个源最多10条
news_item = {
'title': entry.title,
'url': entry.link,
'source': feed.feed.get('title', '未知来源'),
'time': self._parse_time(entry.get('published', '')),
'summary': entry.get('summary', '')[:200]
}
news_items.append(news_item)
return news_items
except Exception as e:
print(f"获取RSS源失败 {rss_url}: {e}")
return []
async def search_news(self, keyword: str, limit: int = 10) -> List[Dict]:
"""搜索新闻"""
try:
# 先获取一些新闻,然后过滤
all_news = await self.fetch_hot_news("general", 50)
filtered_news = [
news for news in all_news
if keyword.lower() in news['title'].lower()
]
return filtered_news[:limit]
except Exception as e:
print(f"搜索新闻失败: {e}")
return []
def _parse_time(self, time_str: str) -> str:
"""解析时间字符串"""
if not time_str:
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time_formats = [
'%a, %d %b %Y %H:%M:%S %Z',
'%a, %d %b %Y %H:%M:%S %z',
'%Y-%m-%dT%H:%M:%S%z',
'%Y-%m-%d %H:%M:%S'
]
for fmt in time_formats:
try:
dt = datetime.strptime(time_str, fmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
continue
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def _deduplicate_news(self, news_list: List[Dict]) -> List[Dict]:
"""去重新闻"""
seen_titles = set()
unique_news = []
for news in news_list:
title = news['title'].lower().strip()
if title not in seen_titles:
seen_titles.add(title)
unique_news.append(news)
return unique_news
async def close(self):
"""关闭客户端"""
if self.client:
await self.client.aclose()
self.client = None
# 创建全局实例
news_fetcher = NewsFetcher()
def _format_news(news_list: list) -> str:
"""格式化新闻输出"""
if not news_list:
return "📰 没有找到相关新闻"
result = ["📰 最新新闻摘要:", ""]
for i, news in enumerate(news_list, 1):
result.append(f"{i}. **{news.get('title', '无标题')}**")
if news.get('source'):
result.append(f" 📍 来源: {news['source']}")
if news.get('time'):
result.append(f" ⏰ 时间: {news['time']}")
if news.get('summary'):
result.append(f" 📝 摘要: {news['summary']}")
if news.get('url'):
result.append(f" 🔗 链接: {news['url']}")
result.append("")
return "
".join(result)
@mcp.tool()
async def get_hot_news(category: str = "general", limit: int = 10) -> str:
"""获取热点新闻
Args:
category: 新闻分类 (general-综合, tech-科技, international-国际)
limit: 返回新闻数量 (1-20)
"""
try:
# 参数验证
if limit > 20:
limit = 20
if limit < 1:
limit = 5
valid_categories = ["general", "tech", "international"]
if category not in valid_categories:
category = "general"
news_list = await news_fetcher.fetch_hot_news(category, limit)
return _format_news(news_list)
except Exception as e:
return f"❌ 获取新闻失败: {str(e)}"
@mcp.tool()
async def search_news(keyword: str, limit: int = 10) -> str:
"""搜索新闻
Args:
keyword: 搜索关键词
limit: 返回结果数量 (1-10)
"""
try:
if limit > 10:
limit = 10
if limit < 1:
limit = 5
if not keyword or len(keyword.strip()) == 0:
return "请输入有效的搜索关键词"
news_list = await news_fetcher.search_news(keyword.strip(), limit)
return _format_news(news_list)
except Exception as e:
return f"❌ 搜索新闻时出错: {str(e)}"
@mcp.tool()
async def list_news_categories() -> str:
"""列出可用的新闻分类"""
categories_info = """
📊 可用的新闻分类:
1. **general** - 综合新闻 (人民网等)
2. **tech** - 科技新闻 (IT之家等)
3. **international** - 国际新闻 (CNN、BBC等)
使用示例: get_hot_news(category="tech", limit=5)
"""
return categories_info
if __name__ == "__main__":
print("🚀 新闻MCP服务器启动中...")
print("📋 可用工具:")
print(" - get_hot_news: 获取热点新闻")
print(" - search_news: 搜索新闻")
print(" - list_news_categories: 列出新闻分类")
# 使用stdio传输
# Bind to 0.0.0.0 so the service is reachable from outside the container
# and ensure it listens on port 8000 (the container's exposed port).
# FastMCP.run forwards args to the underlying ASGI server (uvicorn),
# so passing host/port here makes the containerized service accessible.
try:
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
except TypeError:
# If the FastMCP.run implementation doesn't accept host/port keyword args,
# fall back to the default call so the module still runs.
mcp.run(transport="streamable-http")
dockerfile 需要修改内容
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ /app/
EXPOSE 8000
# Run the server module directly (minimal change). The module `news_server.server`
# has a runnable `__main__` section using mcp.run, so run it as a module.
CMD ["python", "-m", "news_server.server"]
云服务器部署
docker 打包,我这边服务器使用的阿里云,centos 7;本地是 Mac,需要指定环境打包
docker build --platform linux/amd64 -t news-app .
将镜像输出到指定位置
docker save -o /Users/xxx/pyworkspace/news_server/news-app.tar news-app
上传到云服务器后加载 tar 包
docker load -i news-app.tar
查询 镜像包加载完成
docker load -i news-app.tar
启动镜像包
docker run -d --name news-app -p 8000:8000 news-app
查看日志
docker logs news-app
本地验证
curl -v http://localhost:8000/mcp
远程验证
curl -v http://你的 ip:8000/mcp
其余问题排除
防火墙
# 停止防火墙服务(临时)
sudo systemctl stop firewalld
# 启动防火墙服务
sudo systemctl start firewalld
# 在两个区域都添加8000端口
sudo firewall-cmd --zone=public --permanent --add-port=8000/tcp
sudo firewall-cmd --zone=docker --permanent --add-port=8000/tcp
# 重新加载配置
sudo firewall-cmd --reload
# 验证配置
echo "=== Public区域端口 ==="
sudo firewall-cmd --zone=public --list-ports
echo "=== Docker区域端口 ==="
sudo firewall-cmd --zone=docker --list-ports
云服务器端口开放->排查端口是否正确;本次打包运行端口都为 8000,云服务器默认开的 80不一样;
随便找一个 MCP 客户端即可测试;















暂无评论内容