具体脚本如下:
import pyodbc
import requests
import json
import mysql.connector
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Optional, Union, Dict, Tuple, Any
class DatabaseConnection(ABC):
"""数据库连接的抽象基类"""
@abstractmethod
def connect(self):
pass
@abstractmethod
def disconnect(self):
pass
@abstractmethod
def execute_query(self, query: str, params: Optional[Tuple[Any, ...]] = None):
pass
class MySQLConnection(DatabaseConnection):
"""MySQL连接类"""
def __init__(self, host: str, database: str, username: str, password: str, port: int = 3306):
self.host = host
self.database = database
self.username = username
self.password = password
self.port = port
self.connection = None
self.cursor = None
def connect(self):
self.connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.username,
password=self.password,
port=self.port
)
self.cursor = self.connection.cursor()
def disconnect(self):
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def execute_query(self, query: str, params: Optional[Tuple[Any, ...]] = None):
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
return self.cursor.fetchall()
except mysql.connector.Error as e:
raise Exception(f"MySQL查询错误: {str(e)}")
class VehicleDataService:
"""车辆数据服务类"""
def __init__(self, db_connection: DatabaseConnection):
self.db_connection = db_connection
self.db_connection.connect()
def __del__(self):
try:
if hasattr(self, 'db_connection'):
self.db_connection.disconnect()
except:
pass
class TrafficDataPrint:
"""车流数据打印和推送服务"""
def __init__(self, traffic_service: VehicleDataService, webhook_url: str):
self.traffic_service = traffic_service
self.webhook_url = webhook_url
def format_traffic_message(self, date: str, count: int) -> Dict:
"""格式化车流数据消息"""
return {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "车流数据统计",
"content": [
[
{
"tag": "text",
"text": f"📊 车流统计报告\n\n"
},
{
"tag": "text",
"text": f"📅 统计日期:{date}\n"
},
{
"tag": "text",
"text": f"🚗 车流总量:{count:,} 辆次\n"
}
]
]
}
}
}
}
def send_to_feishu(self, message: Dict) -> bool:
"""发送消息到飞书"""
try:
headers = {
'Content-Type': 'application/json'
}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(message)
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
return True
else:
print(f"发送失败: {result.get('msg')}")
return False
else:
print(f"HTTP错误: {response.status_code}")
return False
except Exception as e:
print(f"发送消息时发生错误: {str(e)}")
return False
def get_and_send_traffic_data(self, date: Optional[str] = None) -> bool:
"""获取并发送车流数据"""
try:
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
# 修改为使用车流查询方法
query = """
SELECT COUNT(CarCode)
FROM tc_usercrdtm
WHERE DATE(Crdtm) = DATE(%s)
AND ChannelID IN ('27','28','37')
"""
results = self.traffic_service.db_connection.execute_query(query, (date,))
if not results or results[0][0] is None:
print(f"未找到 {date} 的车流数据")
return False
traffic_count = results[0][0]
message = self.format_traffic_message(date, traffic_count)
return self.send_to_feishu(message)
except Exception as e:
print(f"处理车流数据时发生错误: {str(e)}")
return False
def main():
mysql_config = {
'host': '127.0.0.1',
'database': 'acs_parking_video',
'username': '****', #立方数据用户名
'password': '****', #立方数据库密码
'port': 3306
}
# 飞书webhook配置
webhook_url = "******"
try:
# 创建MySQL连接
db_connection = MySQLConnection(**mysql_config)
# 创建车流数据服务
traffic_service = VehicleDataService(db_connection)
# 创建消息推送服务
traffic_printer = TrafficDataPrint(traffic_service, webhook_url)
# 获取并发送昨天的车流数据
success = traffic_printer.get_and_send_traffic_data()
if success:
print("车流数据已成功发送到飞书")
else:
print("车流数据发送失败")
except Exception as e:
print(f"错误: {str(e)}")
if __name__ == "__main__":
main()