Aries的IT部落格

技术交流和分享

汇纳系统推送客流到飞书

脚本如下:

import pyodbc
import requests
import json
import mysql.connector
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Optional, Union, Dict

class DatabaseConnection(ABC):
    """数据库连接的抽象基类"""
   
    @abstractmethod
    def connect(self):
        pass

    @abstractmethod
    def disconnect(self):
        pass

    @abstractmethod
    def execute_query(self, query: str, params: tuple = None):
        pass

class SQLServerConnection(DatabaseConnection):
    """SQL Server连接类"""
   
    def __init__(self, server: str, database: str, username: str, password: str):
        self.server = server
        self.database = database
        self.username = username
        self.password = password
        self.connection = None
        self.cursor = None

    def connect(self):
        conn_str = f'DRIVER={{SQL Server}};SERVER={self.server};DATABASE={self.database};UID={self.username};PWD={self.password}'
        self.connection = pyodbc.connect(conn_str)
        self.cursor = self.connection.cursor()

    def disconnect(self):
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()

    def execute_query(self, query: str, params: tuple = None):
        try:
            if params:
                self.cursor.execute(query, params)
            else:
                self.cursor.execute(query)
            return self.cursor.fetchall()
        except pyodbc.Error as e:
            raise Exception(f"SQL Server查询错误: {str(e)}")

class MySQLConnection(DatabaseConnection):
    """MySQL连接类"""
   
    def __init__(self, host: str, database: str, username: str, password: str, port: int = 3306):
        self.host = host
        self.database = database
        self.username = username
        self.password = password
        self.port = port
        self.connection = None
        self.cursor = None

    def connect(self):
        self.connection = mysql.connector.connect(
            host=self.host,
            database=self.database,
            user=self.username,
            password=self.password,
            port=self.port
        )
        self.cursor = self.connection.cursor()

    def disconnect(self):
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()

    def execute_query(self, query: str, params: tuple = None):
        try:
            if params:
                self.cursor.execute(query, params)
            else:
                self.cursor.execute(query)
            return self.cursor.fetchall()
        except mysql.connector.Error as e:
            raise Exception(f"MySQL查询错误: {str(e)}")

class TrafficDataService:
    """客流数据服务类"""
   
    def __init__(self, db_connection: DatabaseConnection):
        self.db_connection = db_connection
        self.db_connection.connect()

    def __del__(self):
        self.db_connection.disconnect()

    def get_traffic_data(self, date: str, additional_params: Dict = None) -> Optional[int]:
        """
        查询指定日期的客流数据
        
        Args:
            date: 日期字符串,格式为'YYYY-MM-DD'
            additional_params: 额外的查询参数字典
        
        Returns:
            客流数量,如果没有数据返回None
        """
        try:
            # 基础SQL查询
            query = """
            SELECT [CalculateTraffic]
            FROM [IPVA_S0400_B1_SUB_2025].[dbo].[Summary_Day] t 
            WHERE t.CountDate = ?
            """
           
            # 如果有额外参数,添加到查询条件中
            if additional_params:
                for key, value in additional_params.items():
                    query += f" AND t.{key} = ?"
                params = (date,) + tuple(additional_params.values())
            else:
                params = (date,)

            results = self.db_connection.execute_query(query, params)
           
            if results and results[0][0] is not None:
                return results[0][0]
            return None
           
        except Exception as e:
            raise Exception(f"查询客流数据时发生错误: {str(e)}")


        """
        查询指定日期的车辆数量
        
        Args:
            date: 日期字符串,格式为'YYYY-MM-DD'
        
        Returns:
            车辆数量,如果没有数据返回None
        """
        try:
            query = """
            SELECT COUNT(CarCode) 
            FROM tc_usercrdtm 
            WHERE DATE(Crdtm) = DATE(%s) 
            AND ChannelID IN ('27','37','38')
            """
           
            results = self.db_connection.execute_query(query, (date,))
           
            if results and results[0][0] is not None:
                return results[0][0]
            return None
           
        except Exception as e:
            raise Exception(f"查询车辆数据时发生错误: {str(e)}")

class TrafficDataPrint:
    """客流数据打印和推送服务"""
   
    def __init__(self, traffic_service, webhook_url: str):
        self.traffic_service = traffic_service
        self.webhook_url = webhook_url

    def format_traffic_message(self, date: str, traffic_count: int) -> str:
        """
        格式化客流数据消息
        """
        return {
            "msg_type": "post",
            "content": {
                "post": {
                    "zh_cn": {
                        "title": "日客流数据统计",
                        "content": [
                            [
                                {
                                    "tag": "text",
                                    "text": f"📊 客流统计报告\n\n"
                                },
                                {
                                    "tag": "text",
                                    "text": f"📅 统计日期:{date}\n"
                                },
                                {
                                    "tag": "text",
                                    "text": f"👥 客流总量:{traffic_count:,} 人次\n"
                                }
                            ]
                        ]
                    }
                }
            }
        }

    def send_to_feishu(self, message: Dict) -> bool:
        """
        发送消息到飞书机器人
        """
        try:
            headers = {
                'Content-Type': 'application/json'
            }
            response = requests.post(
                self.webhook_url,
                headers=headers,
                data=json.dumps(message)
            )
           
            if response.status_code == 200:
                result = response.json()
                if result.get('code') == 0:
                    return True
                else:
                    print(f"发送失败: {result.get('msg')}")
                    return False
            else:
                print(f"HTTP错误: {response.status_code}")
                return False
               
        except Exception as e:
            print(f"发送消息时发生错误: {str(e)}")
            return False

    def get_and_send_traffic_data(self, date: str = None) -> bool:
        """
        获取并发送客流数据
        
        Args:
            date: 指定日期,格式为'YYYY-MM-DD',如果为None则使用昨天的日期
        """
        try:
            # 如果没有指定日期,使用昨天的日期
            if date is None:
                yesterday = datetime.now() - timedelta(days=1)
                date = yesterday.strftime('%Y-%m-%d')

            # 获取客流数据
            traffic_count = self.traffic_service.get_traffic_data(date)
           
            if traffic_count is None:
                print(f"未找到 {date} 的客流数据")
                return False

            # 格式化消息
            message = self.format_traffic_message(date, traffic_count)
           
            # 发送到飞书
            return self.send_to_feishu(message)

        except Exception as e:
            print(f"处理客流数据时发生错误: {str(e)}")
            return False

def main():
    # 数据库配置
    sqlserver_config = {
        'server': '10.**.**.***', #替换本地服务器地址
        'database': 'IPVA_S0400_B1_SUB_2025',
        'username': '***',  #替换本地账号
        'password': '******' #替换本地密码
    }

    # 飞书webhook地址
    webhook_url = "***********"

    try:
        # 创建数据库连接
        db_connection = SQLServerConnection(**sqlserver_config)
       
        # 创建客流数据服务
        traffic_service = TrafficDataService(db_connection)
       
        # 创建客流数据打印服务
        traffic_printer = TrafficDataPrint(traffic_service, webhook_url)
       
        # 获取并发送昨天的客流数据
        success = traffic_printer.get_and_send_traffic_data()
       
        if success:
            print("客流数据已成功发送到飞书")
        else:
            print("客流数据发送失败")

    except Exception as e:
        print(f"错误: {str(e)}")

if __name__ == "__main__":
    main()


«    2025年4月    »
123456
78910111213
14151617181920
21222324252627
282930
控制面板
歡迎您造訪本網站!
  查看权限
网站分类
搜索
最新留言
    文章归档
    网站收藏
    友情链接

    Powered By Z-BlogPHP 1.7.2

    Mail to:hhesong@126.com. Copyright elecccom.cn.Some Rights Reserved.冀ICP备18030769号-1