脚本如下:
import pyodbc
import requests
import json
import mysql.connector
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Optional, Union, Dict
class DatabaseConnection(ABC):
"""数据库连接的抽象基类"""
@abstractmethod
def connect(self):
pass
@abstractmethod
def disconnect(self):
pass
@abstractmethod
def execute_query(self, query: str, params: tuple = None):
pass
class SQLServerConnection(DatabaseConnection):
"""SQL Server连接类"""
def __init__(self, server: str, database: str, username: str, password: str):
self.server = server
self.database = database
self.username = username
self.password = password
self.connection = None
self.cursor = None
def connect(self):
conn_str = f'DRIVER={{SQL Server}};SERVER={self.server};DATABASE={self.database};UID={self.username};PWD={self.password}'
self.connection = pyodbc.connect(conn_str)
self.cursor = self.connection.cursor()
def disconnect(self):
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def execute_query(self, query: str, params: tuple = None):
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
return self.cursor.fetchall()
except pyodbc.Error as e:
raise Exception(f"SQL Server查询错误: {str(e)}")
class MySQLConnection(DatabaseConnection):
"""MySQL连接类"""
def __init__(self, host: str, database: str, username: str, password: str, port: int = 3306):
self.host = host
self.database = database
self.username = username
self.password = password
self.port = port
self.connection = None
self.cursor = None
def connect(self):
self.connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.username,
password=self.password,
port=self.port
)
self.cursor = self.connection.cursor()
def disconnect(self):
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def execute_query(self, query: str, params: tuple = None):
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
return self.cursor.fetchall()
except mysql.connector.Error as e:
raise Exception(f"MySQL查询错误: {str(e)}")
class TrafficDataService:
"""客流数据服务类"""
def __init__(self, db_connection: DatabaseConnection):
self.db_connection = db_connection
self.db_connection.connect()
def __del__(self):
self.db_connection.disconnect()
def get_traffic_data(self, date: str, additional_params: Dict = None) -> Optional[int]:
"""
查询指定日期的客流数据
Args:
date: 日期字符串,格式为'YYYY-MM-DD'
additional_params: 额外的查询参数字典
Returns:
客流数量,如果没有数据返回None
"""
try:
# 基础SQL查询
query = """
SELECT [CalculateTraffic]
FROM [IPVA_S0400_B1_SUB_2025].[dbo].[Summary_Day] t
WHERE t.CountDate = ?
"""
# 如果有额外参数,添加到查询条件中
if additional_params:
for key, value in additional_params.items():
query += f" AND t.{key} = ?"
params = (date,) + tuple(additional_params.values())
else:
params = (date,)
results = self.db_connection.execute_query(query, params)
if results and results[0][0] is not None:
return results[0][0]
return None
except Exception as e:
raise Exception(f"查询客流数据时发生错误: {str(e)}")
"""
查询指定日期的车辆数量
Args:
date: 日期字符串,格式为'YYYY-MM-DD'
Returns:
车辆数量,如果没有数据返回None
"""
try:
query = """
SELECT COUNT(CarCode)
FROM tc_usercrdtm
WHERE DATE(Crdtm) = DATE(%s)
AND ChannelID IN ('27','37','38')
"""
results = self.db_connection.execute_query(query, (date,))
if results and results[0][0] is not None:
return results[0][0]
return None
except Exception as e:
raise Exception(f"查询车辆数据时发生错误: {str(e)}")
class TrafficDataPrint:
"""客流数据打印和推送服务"""
def __init__(self, traffic_service, webhook_url: str):
self.traffic_service = traffic_service
self.webhook_url = webhook_url
def format_traffic_message(self, date: str, traffic_count: int) -> str:
"""
格式化客流数据消息
"""
return {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "日客流数据统计",
"content": [
[
{
"tag": "text",
"text": f"📊 客流统计报告\n\n"
},
{
"tag": "text",
"text": f"📅 统计日期:{date}\n"
},
{
"tag": "text",
"text": f"👥 客流总量:{traffic_count:,} 人次\n"
}
]
]
}
}
}
}
def send_to_feishu(self, message: Dict) -> bool:
"""
发送消息到飞书机器人
"""
try:
headers = {
'Content-Type': 'application/json'
}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(message)
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
return True
else:
print(f"发送失败: {result.get('msg')}")
return False
else:
print(f"HTTP错误: {response.status_code}")
return False
except Exception as e:
print(f"发送消息时发生错误: {str(e)}")
return False
def get_and_send_traffic_data(self, date: str = None) -> bool:
"""
获取并发送客流数据
Args:
date: 指定日期,格式为'YYYY-MM-DD',如果为None则使用昨天的日期
"""
try:
# 如果没有指定日期,使用昨天的日期
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
# 获取客流数据
traffic_count = self.traffic_service.get_traffic_data(date)
if traffic_count is None:
print(f"未找到 {date} 的客流数据")
return False
# 格式化消息
message = self.format_traffic_message(date, traffic_count)
# 发送到飞书
return self.send_to_feishu(message)
except Exception as e:
print(f"处理客流数据时发生错误: {str(e)}")
return False
def main():
# 数据库配置
sqlserver_config = {
'server': '10.**.**.***', #替换本地服务器地址
'database': 'IPVA_S0400_B1_SUB_2025',
'username': '***', #替换本地账号
'password': '******' #替换本地密码
}
# 飞书webhook地址
webhook_url = "***********"
try:
# 创建数据库连接
db_connection = SQLServerConnection(**sqlserver_config)
# 创建客流数据服务
traffic_service = TrafficDataService(db_connection)
# 创建客流数据打印服务
traffic_printer = TrafficDataPrint(traffic_service, webhook_url)
# 获取并发送昨天的客流数据
success = traffic_printer.get_and_send_traffic_data()
if success:
print("客流数据已成功发送到飞书")
else:
print("客流数据发送失败")
except Exception as e:
print(f"错误: {str(e)}")
if __name__ == "__main__":
main()