180 lines
6.7 KiB
Python
180 lines
6.7 KiB
Python
from langchain.tools import BaseTool
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional, Type, Literal, ClassVar
|
||
import datetime
|
||
import pytz
|
||
import logging
|
||
|
||
logger = logging.getLogger("datetime_tool")
|
||
|
||
# 定义输入参数模型(使用Pydantic替代原get_parameters())
|
||
class DateTimeInput(BaseModel):
|
||
operation: Literal["current_time", "timezone_convert", "date_diff", "add_time", "format_date"] = Field(
|
||
description="操作类型: current_time(当前时间), timezone_convert(时区转换), "
|
||
"date_diff(日期差), add_time(时间加减), format_date(格式化日期)"
|
||
)
|
||
timezone: Optional[str] = Field(
|
||
default="UTC",
|
||
description="时区名称 (e.g., 'UTC', 'Asia/Shanghai')"
|
||
)
|
||
date_string: Optional[str] = Field(
|
||
description="日期字符串 (格式: YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"
|
||
)
|
||
target_timezone: Optional[str] = Field(
|
||
description="目标时区(用于时区转换)"
|
||
)
|
||
days: Optional[int] = Field(
|
||
default=0,
|
||
description="要加减的天数"
|
||
)
|
||
hours: Optional[int] = Field(
|
||
default=0,
|
||
description="要加减的小时数"
|
||
)
|
||
format: Optional[str] = Field(
|
||
default="%Y-%m-%d %H:%M:%S",
|
||
description="日期格式字符串 (e.g., '%Y-%m-%d %H:%M:%S')"
|
||
)
|
||
|
||
class DateTimeTool(BaseTool):
|
||
"""日期时间操作工具(支持时区转换、日期计算等)"""
|
||
|
||
name: ClassVar[str] = "datetime_tool"
|
||
description: ClassVar[str] = """执行日期时间相关操作,包括:
|
||
- 获取当前时间
|
||
- 时区转换
|
||
- 计算日期差
|
||
- 日期时间加减
|
||
- 格式化日期
|
||
使用时必须指定operation参数确定操作类型。"""
|
||
args_schema: Type[BaseModel] = DateTimeInput
|
||
|
||
def _parse_datetime(self, date_string: str, timezone_str: str = "UTC") -> datetime.datetime:
|
||
"""解析日期字符串(私有方法)"""
|
||
tz = pytz.timezone(timezone_str)
|
||
formats = [
|
||
"%Y-%m-%d %H:%M:%S",
|
||
"%Y-%m-%d %H:%M",
|
||
"%Y-%m-%d",
|
||
"%Y/%m/%d %H:%M:%S",
|
||
"%Y/%m/%d",
|
||
"%d/%m/%Y %H:%M:%S",
|
||
"%d/%m/%Y",
|
||
"%m/%d/%Y %H:%M:%S",
|
||
"%m/%d/%Y"
|
||
]
|
||
|
||
for fmt in formats:
|
||
try:
|
||
dt = datetime.datetime.strptime(date_string, fmt)
|
||
return tz.localize(dt)
|
||
except ValueError:
|
||
continue
|
||
raise ValueError(f"无法解析日期字符串: {date_string}")
|
||
|
||
def _run(self,
|
||
operation: str,
|
||
timezone: str = "UTC",
|
||
date_string: Optional[str] = None,
|
||
target_timezone: Optional[str] = None,
|
||
days: int = 0,
|
||
hours: int = 0,
|
||
format: str = "%Y-%m-%d %H:%M:%S") -> dict:
|
||
"""同步执行日期时间操作"""
|
||
logger.info(f"执行日期时间操作: {operation}")
|
||
|
||
try:
|
||
if operation == "current_time":
|
||
tz = pytz.timezone(timezone)
|
||
now = datetime.datetime.now(tz)
|
||
return {
|
||
"status": "success",
|
||
"result": {
|
||
"formatted": now.strftime(format),
|
||
"iso": now.isoformat(),
|
||
"timestamp": now.timestamp(),
|
||
"timezone": timezone
|
||
},
|
||
"summary": f"当前时间 ({timezone}): {now.strftime(format)}"
|
||
}
|
||
|
||
elif operation == "timezone_convert":
|
||
if not date_string or not target_timezone:
|
||
raise ValueError("必须提供date_string和target_timezone参数")
|
||
|
||
source_dt = self._parse_datetime(date_string, timezone)
|
||
target_dt = source_dt.astimezone(pytz.timezone(target_timezone))
|
||
|
||
return {
|
||
"status": "success",
|
||
"result": {
|
||
"source": source_dt.strftime(format),
|
||
"target": target_dt.strftime(format),
|
||
"source_tz": timezone,
|
||
"target_tz": target_timezone
|
||
},
|
||
"summary": f"时区转换: {source_dt.strftime(format)} → {target_dt.strftime(format)}"
|
||
}
|
||
|
||
elif operation == "date_diff":
|
||
if not date_string:
|
||
raise ValueError("必须提供date_string参数")
|
||
|
||
target_dt = self._parse_datetime(date_string, timezone)
|
||
current_dt = datetime.datetime.now(pytz.timezone(timezone))
|
||
delta = target_dt - current_dt
|
||
|
||
return {
|
||
"status": "success",
|
||
"result": {
|
||
"days": delta.days,
|
||
"hours": delta.seconds // 3600,
|
||
"total_seconds": delta.total_seconds(),
|
||
"is_future": delta.days > 0
|
||
},
|
||
"summary": f"日期差: {abs(delta.days)}天 {delta.seconds//3600}小时"
|
||
}
|
||
|
||
elif operation == "add_time":
|
||
base_dt = self._parse_datetime(date_string, timezone) if date_string \
|
||
else datetime.datetime.now(pytz.timezone(timezone))
|
||
new_dt = base_dt + datetime.timedelta(days=days, hours=hours)
|
||
|
||
return {
|
||
"status": "success",
|
||
"result": {
|
||
"original": base_dt.strftime(format),
|
||
"new": new_dt.strftime(format),
|
||
"delta": f"{days}天 {hours}小时"
|
||
},
|
||
"summary": f"时间计算: {base_dt.strftime(format)} + {days}天 {hours}小时 = {new_dt.strftime(format)}"
|
||
}
|
||
|
||
elif operation == "format_date":
|
||
dt = self._parse_datetime(date_string, timezone) if date_string \
|
||
else datetime.datetime.now(pytz.timezone(timezone))
|
||
formatted = dt.strftime(format)
|
||
|
||
return {
|
||
"status": "success",
|
||
"result": {
|
||
"original": dt.isoformat(),
|
||
"formatted": formatted
|
||
},
|
||
"summary": f"格式化结果: {formatted}"
|
||
}
|
||
|
||
else:
|
||
raise ValueError(f"未知操作类型: {operation}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"操作失败: {str(e)}")
|
||
return {
|
||
"status": "error",
|
||
"message": str(e),
|
||
"operation": operation
|
||
}
|
||
|
||
async def _arun(self, **kwargs):
|
||
"""异步执行"""
|
||
return self._run(**kwargs) |