脚本如下:
import pyodbc import requests import json import mysql.connector from abc import ABC, abstractmethod from datetime import datetime, timedelta from typing import Optional, Union, Dict class DatabaseConnection(ABC): """数据库连接的抽象基类""" @abstractmethod def connect(self): pass @abstractmethod def disconnect(self): pass @abstractmethod def execute_query(self, query: str, params: tuple = None): pass class SQLServerConnection(DatabaseConnection): """SQL Server连接类""" def __init__(self, server: str, database: str, username: str, password: str): self.server = server self.database = database self.username = username self.password = password self.connection = None self.cursor = None def connect(self): conn_str = f'DRIVER={{SQL Server}};SERVER={self.server};DATABASE={self.database};UID={self.username};PWD={self.password}' self.connection = pyodbc.connect(conn_str) self.cursor = self.connection.cursor() def disconnect(self): if self.cursor: self.cursor.close() if self.connection: self.connection.close() def execute_query(self, query: str, params: tuple = None): try: if params: self.cursor.execute(query, params) else: self.cursor.execute(query) return self.cursor.fetchall() except pyodbc.Error as e: raise Exception(f"SQL Server查询错误: {str(e)}") class MySQLConnection(DatabaseConnection): """MySQL连接类""" def __init__(self, host: str, database: str, username: str, password: str, port: int = 3306): self.host = host self.database = database self.username = username self.password = password self.port = port self.connection = None self.cursor = None def connect(self): self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.username, password=self.password, port=self.port ) self.cursor = self.connection.cursor() def disconnect(self): if self.cursor: self.cursor.close() if self.connection: self.connection.close() def execute_query(self, query: str, params: tuple = None): try: if params: self.cursor.execute(query, params) else: self.cursor.execute(query) return self.cursor.fetchall() except mysql.connector.Error as e: raise Exception(f"MySQL查询错误: {str(e)}") class TrafficDataService: """客流数据服务类""" def __init__(self, db_connection: DatabaseConnection): self.db_connection = db_connection self.db_connection.connect() def __del__(self): self.db_connection.disconnect() def get_traffic_data(self, date: str, additional_params: Dict = None) -> Optional[int]: """ 查询指定日期的客流数据 Args: date: 日期字符串,格式为'YYYY-MM-DD' additional_params: 额外的查询参数字典 Returns: 客流数量,如果没有数据返回None """ try: # 基础SQL查询 query = """ SELECT [CalculateTraffic] FROM [IPVA_S0400_B1_SUB_2025].[dbo].[Summary_Day] t WHERE t.CountDate = ? """ # 如果有额外参数,添加到查询条件中 if additional_params: for key, value in additional_params.items(): query += f" AND t.{key} = ?" params = (date,) + tuple(additional_params.values()) else: params = (date,) results = self.db_connection.execute_query(query, params) if results and results[0][0] is not None: return results[0][0] return None except Exception as e: raise Exception(f"查询客流数据时发生错误: {str(e)}") """ 查询指定日期的车辆数量 Args: date: 日期字符串,格式为'YYYY-MM-DD' Returns: 车辆数量,如果没有数据返回None """ try: query = """ SELECT COUNT(CarCode) FROM tc_usercrdtm WHERE DATE(Crdtm) = DATE(%s) AND ChannelID IN ('27','37','38') """ results = self.db_connection.execute_query(query, (date,)) if results and results[0][0] is not None: return results[0][0] return None except Exception as e: raise Exception(f"查询车辆数据时发生错误: {str(e)}") class TrafficDataPrint: """客流数据打印和推送服务""" def __init__(self, traffic_service, webhook_url: str): self.traffic_service = traffic_service self.webhook_url = webhook_url def format_traffic_message(self, date: str, traffic_count: int) -> str: """ 格式化客流数据消息 """ return { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "日客流数据统计", "content": [ [ { "tag": "text", "text": f"📊 客流统计报告\n\n" }, { "tag": "text", "text": f"📅 统计日期:{date}\n" }, { "tag": "text", "text": f"👥 客流总量:{traffic_count:,} 人次\n" } ] ] } } } } def send_to_feishu(self, message: Dict) -> bool: """ 发送消息到飞书机器人 """ try: headers = { 'Content-Type': 'application/json' } response = requests.post( self.webhook_url, headers=headers, data=json.dumps(message) ) if response.status_code == 200: result = response.json() if result.get('code') == 0: return True else: print(f"发送失败: {result.get('msg')}") return False else: print(f"HTTP错误: {response.status_code}") return False except Exception as e: print(f"发送消息时发生错误: {str(e)}") return False def get_and_send_traffic_data(self, date: str = None) -> bool: """ 获取并发送客流数据 Args: date: 指定日期,格式为'YYYY-MM-DD',如果为None则使用昨天的日期 """ try: # 如果没有指定日期,使用昨天的日期 if date is None: yesterday = datetime.now() - timedelta(days=1) date = yesterday.strftime('%Y-%m-%d') # 获取客流数据 traffic_count = self.traffic_service.get_traffic_data(date) if traffic_count is None: print(f"未找到 {date} 的客流数据") return False # 格式化消息 message = self.format_traffic_message(date, traffic_count) # 发送到飞书 return self.send_to_feishu(message) except Exception as e: print(f"处理客流数据时发生错误: {str(e)}") return False def main(): # 数据库配置 sqlserver_config = { 'server': '10.**.**.***', #替换本地服务器地址 'database': 'IPVA_S0400_B1_SUB_2025', 'username': '***', #替换本地账号 'password': '******' #替换本地密码 } # 飞书webhook地址 webhook_url = "***********" try: # 创建数据库连接 db_connection = SQLServerConnection(**sqlserver_config) # 创建客流数据服务 traffic_service = TrafficDataService(db_connection) # 创建客流数据打印服务 traffic_printer = TrafficDataPrint(traffic_service, webhook_url) # 获取并发送昨天的客流数据 success = traffic_printer.get_and_send_traffic_data() if success: print("客流数据已成功发送到飞书") else: print("客流数据发送失败") except Exception as e: print(f"错误: {str(e)}") if __name__ == "__main__": main()