python-MCPServer拉取和使用部署

系统环境

系统 macOS 26.0.1
本次学习使用的项目

环境搭建

虚拟环境搭建


cd pyworkspace/news_server
uv venv
uv init 

创建一个虚拟环境以后,手动安装fastmcp,原项目中使用 mcp-server-fastmcp,依赖找不到,可能是变更了。目前使用如下方式可以解决


uv add fastmcp

# 清理 requirements.txt 中的问题包
sed -i '/mcp-server-fastmcp/d' requirements.txt  # Linux/Mac
# 或手动编辑文件 ,删除requirements.txt  中的mcp-server-fastmcp并保存文件
# 安装依赖
uv pip install -r requirements.txt
# 查看依赖结果
uv pip list

本地启动

点开 Server.py文件,替换
mcp.run(transport="streamable-http")
本地启动测试;更推荐作为服务使用 参考该文档不同传输模式对比

docekr 部署

当前项目需要修改的点如下,修改的目的,docker 中部署指定 host 和 port
server.py 中
1.

mcp = FastMCP("NewsServer", host="0.0.0.0", port=8000)

2.


    
    # 使用stdio传输
    # Bind to 0.0.0.0 so the service is reachable from outside the container
    # and ensure it listens on port 8000 (the container's exposed port).
    # FastMCP.run forwards args to the underlying ASGI server (uvicorn),
    # so passing host/port here makes the containerized service accessible.
    try:
        mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
    except TypeError:
        # If the FastMCP.run implementation doesn't accept host/port keyword args,
        # fall back to the default call so the module still runs.
        mcp.run(transport="streamable-http")

import asyncio
import httpx
import feedparser
from datetime import datetime
from typing import List, Dict
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("NewsServer", host="0.0.0.0", port=8000)

class NewsFetcher:
    def __init__(self):
        self.client = None
        self.news_sources = {
            "tech": [
                "https://www.ithome.com/rss/",
            ],
            "general": [
                "http://www.people.com.cn/rss/politics.xml"
            ],
            "international": [
                "http://rss.cnn.com/rss/edition.rss",
                "https://feeds.bbci.co.uk/news/world/rss.xml"
            ]
        }

    async def initialize(self):
        """初始化HTTP客户端"""
        if self.client is None:
            self.client = httpx.AsyncClient(
                timeout=30.0,
                headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                }
            )
        return True

    async def fetch_hot_news(self, category: str = "general", limit: int = 20) -> List[Dict]:
        """获取热点新闻"""
        await self.initialize()
        news_list = []
        sources = self.news_sources.get(category, self.news_sources["general"])
        
        # 并发获取所有RSS源
        tasks = [self._fetch_rss_feed(source) for source in sources[:3]]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, Exception):
                continue
            news_list.extend(result)

        # 去重和排序
        news_list = self._deduplicate_news(news_list)
        news_list.sort(key=lambda x: x.get('time', ''), reverse=True)
        return news_list[:limit]

    async def _fetch_rss_feed(self, rss_url: str) -> List[Dict]:
        """获取单个RSS源的内容"""
        try:
            response = await self.client.get(rss_url)
            feed = feedparser.parse(response.content)
            news_items = []
            
            for entry in feed.entries[:10]:  # 每个源最多10条
                news_item = {
                    'title': entry.title,
                    'url': entry.link,
                    'source': feed.feed.get('title', '未知来源'),
                    'time': self._parse_time(entry.get('published', '')),
                    'summary': entry.get('summary', '')[:200]
                }
                news_items.append(news_item)
            return news_items
        except Exception as e:
            print(f"获取RSS源失败 {rss_url}: {e}")
            return []

    async def search_news(self, keyword: str, limit: int = 10) -> List[Dict]:
        """搜索新闻"""
        try:
            # 先获取一些新闻,然后过滤
            all_news = await self.fetch_hot_news("general", 50)
            filtered_news = [
                news for news in all_news
                if keyword.lower() in news['title'].lower()
            ]
            return filtered_news[:limit]
        except Exception as e:
            print(f"搜索新闻失败: {e}")
            return []

    def _parse_time(self, time_str: str) -> str:
        """解析时间字符串"""
        if not time_str:
            return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        time_formats = [
            '%a, %d %b %Y %H:%M:%S %Z',
            '%a, %d %b %Y %H:%M:%S %z',
            '%Y-%m-%dT%H:%M:%S%z',
            '%Y-%m-%d %H:%M:%S'
        ]
        
        for fmt in time_formats:
            try:
                dt = datetime.strptime(time_str, fmt)
                return dt.strftime('%Y-%m-%d %H:%M:%S')
            except ValueError:
                continue
        return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    def _deduplicate_news(self, news_list: List[Dict]) -> List[Dict]:
        """去重新闻"""
        seen_titles = set()
        unique_news = []
        for news in news_list:
            title = news['title'].lower().strip()
            if title not in seen_titles:
                seen_titles.add(title)
                unique_news.append(news)
        return unique_news

    async def close(self):
        """关闭客户端"""
        if self.client:
            await self.client.aclose()
            self.client = None

# 创建全局实例
news_fetcher = NewsFetcher()

def _format_news(news_list: list) -> str:
    """格式化新闻输出"""
    if not news_list:
        return "📰 没有找到相关新闻"
    
    result = ["📰 最新新闻摘要:", ""]
    for i, news in enumerate(news_list, 1):
        result.append(f"{i}. **{news.get('title', '无标题')}**")
        if news.get('source'):
            result.append(f"   📍 来源: {news['source']}")
        if news.get('time'):
            result.append(f"   ⏰ 时间: {news['time']}")
        if news.get('summary'):
            result.append(f"   📝 摘要: {news['summary']}")
        if news.get('url'):
            result.append(f"   🔗 链接: {news['url']}")
        result.append("")
    
    return "
".join(result)

@mcp.tool()
async def get_hot_news(category: str = "general", limit: int = 10) -> str:
    """获取热点新闻
    
    Args:
        category: 新闻分类 (general-综合, tech-科技, international-国际)
        limit: 返回新闻数量 (1-20)
    """
    try:
        # 参数验证
        if limit > 20:
            limit = 20
        if limit < 1:
            limit = 5
            
        valid_categories = ["general", "tech", "international"]
        if category not in valid_categories:
            category = "general"
            
        news_list = await news_fetcher.fetch_hot_news(category, limit)
        return _format_news(news_list)
    except Exception as e:
        return f"❌ 获取新闻失败: {str(e)}"

@mcp.tool()
async def search_news(keyword: str, limit: int = 10) -> str:
    """搜索新闻
    
    Args:
        keyword: 搜索关键词
        limit: 返回结果数量 (1-10)
    """
    try:
        if limit > 10:
            limit = 10
        if limit < 1:
            limit = 5
            
        if not keyword or len(keyword.strip()) == 0:
            return "请输入有效的搜索关键词"
            
        news_list = await news_fetcher.search_news(keyword.strip(), limit)
        return _format_news(news_list)
    except Exception as e:
        return f"❌ 搜索新闻时出错: {str(e)}"

@mcp.tool()
async def list_news_categories() -> str:
    """列出可用的新闻分类"""
    categories_info = """
📊 可用的新闻分类:

1. **general** - 综合新闻 (人民网等)
2. **tech** - 科技新闻 (IT之家等)  
3. **international** - 国际新闻 (CNN、BBC等)

使用示例: get_hot_news(category="tech", limit=5)
"""
    return categories_info

if __name__ == "__main__":
    print("🚀 新闻MCP服务器启动中...")
    print("📋 可用工具:")
    print("   - get_hot_news: 获取热点新闻")
    print("   - search_news: 搜索新闻") 
    print("   - list_news_categories: 列出新闻分类")
    
    # 使用stdio传输
    # Bind to 0.0.0.0 so the service is reachable from outside the container
    # and ensure it listens on port 8000 (the container's exposed port).
    # FastMCP.run forwards args to the underlying ASGI server (uvicorn),
    # so passing host/port here makes the containerized service accessible.
    try:
        mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
    except TypeError:
        # If the FastMCP.run implementation doesn't accept host/port keyword args,
        # fall back to the default call so the module still runs.
        mcp.run(transport="streamable-http")

dockerfile 需要修改内容


FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ /app/

EXPOSE 8000

# Run the server module directly (minimal change). The module `news_server.server`
# has a runnable `__main__` section using mcp.run, so run it as a module.
CMD ["python", "-m", "news_server.server"]

云服务器部署

docker 打包,我这边服务器使用的阿里云,centos 7;本地是 Mac,需要指定环境打包

docker build --platform linux/amd64 -t news-app .

将镜像输出到指定位置

docker save -o /Users/xxx/pyworkspace/news_server/news-app.tar news-app

上传到云服务器后加载 tar 包

docker load -i news-app.tar

查询 镜像包加载完成

docker load -i news-app.tar

启动镜像包

docker run -d --name news-app -p 8000:8000 news-app

查看日志

docker logs news-app

本地验证

curl -v http://localhost:8000/mcp

远程验证

curl -v http://你的 ip:8000/mcp

其余问题排除

防火墙


# 停止防火墙服务(临时)
sudo systemctl stop firewalld

# 启动防火墙服务
sudo systemctl start firewalld

# 在两个区域都添加8000端口
sudo firewall-cmd --zone=public --permanent --add-port=8000/tcp
sudo firewall-cmd --zone=docker --permanent --add-port=8000/tcp

# 重新加载配置
sudo firewall-cmd --reload

# 验证配置
echo "=== Public区域端口 ==="
sudo firewall-cmd --zone=public --list-ports
echo "=== Docker区域端口 ==="
sudo firewall-cmd --zone=docker --list-ports

云服务器端口开放->排查端口是否正确;本次打包运行端口都为 8000,云服务器默认开的 80不一样;

随便找一个 MCP 客户端即可测试;

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
知寺小稚的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容