侧边栏壁纸
博主头像
沐竹雨 博主等级

简简单单就好。。

  • 累计撰写 18 篇文章
  • 累计创建 19 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

未命名文章

沐竹雨
2026-02-26 / 0 评论 / 0 点赞 / 1 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
请将以下代码保存为对应的文件路径。此结构清晰展示了软件的复杂度。
BDS_5G_Locator/
├── config/
│   ├── __init__.py
│   ├── settings.py          # 系统配置
│   └── logging_config.py    # 日志配置
├── core/
│   ├── __init__.py
│   ├── algorithms.py        # 核心融合算法 (卡尔曼滤波等)
│   ├── geofence.py          # 电子围栏逻辑
│   └── data_parser.py       # 协议解析 (NMEA, MQTT)
├── hardware/
│   ├── __init__.py
│   ├── arm_driver.py        # 定制ARM底层驱动模拟
│   ├── gnss_module.py       # 北斗模块交互
│   └── five_g_module.py     # 5G模块交互
├── models/
│   ├── __init__.py
│   ├── database.py          # 数据库连接池
│   └── schemas.py           # 数据模型定义
├── services/
│   ├── __init__.py
│   ├── location_service.py  # 定位业务逻辑
│   ├── alert_service.py     # 报警服务
│   └── report_service.py    # 报表生成
├── api/
│   ├── __init__.py
│   ├── routes.py            # RESTful API路由
│   └── middleware.py        # 中间件
├── utils/
│   ├── __init__.py
│   ├── helpers.py           # 通用工具函数
│   └── crypto.py            # 加密解密
├── tests/
│   ├── __init__.py
│   ├── test_algorithms.py   # 算法测试
│   └── test_hardware.py     # 硬件模拟测试
├── main.py                  # 程序入口
├── requirements.txt         # 依赖列表
└── README.md                # 说明文档

1. 配置与基础设置 (config/)
config/settings.py
"""
系统全局配置文件
基于5G+北斗的人员定位系统 - 配置中心
版本: 1.0.0
作者: 系统开发组
"""

import os
from typing import List, Dict, Any
from dataclasses import dataclass, field

@dataclass
class DatabaseConfig:
    """数据库配置类"""
    driver: str = "sqlite"
    host: str = "localhost"
    port: int = 3306
    user: str = "root"
    password: str = os.getenv("DB_PASSWORD", "secure_password_123")
    database: str = "bds_5g_locator"
    pool_size: int = 10
    max_overflow: int = 20

    @property
    def connection_string(self) -> str:
        if self.driver == "sqlite":
            return f"sqlite:///./{self.database}.db"
        return f"{self.driver}://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"

@dataclass
class HardwareConfig:
    """定制ARM硬件配置类"""
    # 串口配置
    gnss_port: str = "/dev/ttyUSB0"
    gnss_baudrate: int = 115200
    five_g_port: str = "/dev/ttyUSB2"
    five_g_baudrate: int = 921600
    
    # GPIO引脚定义 (模拟定制ARM板卡)
    led_status_pin: int = 18
    sos_button_pin: int = 23
    power_mgmt_pin: int = 24
    
    # 采样频率
    sampling_rate_hz: int = 5  # 5Hz
    timeout_seconds: float = 2.0

@dataclass
class AlgorithmConfig:
    """定位算法配置类"""
    # 卡尔曼滤波参数
    kf_process_noise: float = 0.001
    kf_measurement_noise: float = 0.01
    
    # 融合权重
    bds_weight_outdoor: float = 0.9
    bds_weight_indoor: float = 0.3
    five_g_weight_indoor: float = 0.7
    
    # 精度阈值
    accuracy_threshold_meters: float = 2.0
    h_dop_threshold: float = 2.5

@dataclass
class SystemConfig:
    """系统总配置"""
    app_name: str = "BDS_5G_Personnel_Locator"
    version: str = "1.0.0"
    debug: bool = False
    log_level: str = "INFO"
    
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    hardware: HardwareConfig = field(default_factory=HardwareConfig)
    algorithm: AlgorithmConfig = field(default_factory=AlgorithmConfig)
    
    # 电子围栏默认配置
    default_geofence_radius: float = 100.0  # 米
    alert_check_interval: int = 1  # 秒

# 实例化全局配置对象
CONFIG = SystemConfig()

def get_config() -> SystemConfig:
    """获取全局配置单例"""
    return CONFIG
config/logging_config.py
"""
日志系统配置模块
实现多级日志记录、文件轮转、格式化输出
"""

import logging
import os
import sys
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from config.settings import CONFIG

class LoggerSetup:
    """日志初始化类"""
    
    def __init__(self, log_dir: str = "./logs"):
        self.log_dir = log_dir
        self.logger = logging.getLogger(CONFIG.app_name)
        self._setup_logger()

    def _setup_logger(self):
        """配置日志处理器和格式"""
        if not os.path.exists(self.log_dir):
            os.makedirs(self.log_dir)

        # 设置日志级别
        level = getattr(logging, CONFIG.log_level.upper(), logging.INFO)
        self.logger.setLevel(level)

        # 清除现有处理器
        if self.logger.hasHandlers():
            self.logger.handlers.clear()

        # 定义格式化器
        formatter = logging.Formatter(
            fmt='%(asctime)s | %(levelname)-8s | %(name)s | %(filename)s:%(lineno)d | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )

        # 控制台处理器
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        console_handler.setLevel(level)
        self.logger.addHandler(console_handler)

        # 文件处理器 (按大小轮转)
        file_handler = RotatingFileHandler(
            filename=os.path.join(self.log_dir, "system.log"),
            maxBytes=10 * 1024 * 1024,  # 10MB
            backupCount=5,
            encoding='utf-8'
        )
        file_handler.setFormatter(formatter)
        file_handler.setLevel(level)
        self.logger.addHandler(file_handler)

        # 错误日志专用处理器
        error_handler = RotatingFileHandler(
            filename=os.path.join(self.log_dir, "error.log"),
            maxBytes=10 * 1024 * 1024,
            backupCount=5,
            encoding='utf-8'
        )
        error_handler.setLevel(logging.ERROR)
        error_handler.setFormatter(formatter)
        self.logger.addHandler(error_handler)

    def get_logger(self) -> logging.Logger:
        """获取logger实例"""
        return self.logger

# 初始化日志
log_setup = LoggerSetup()
logger = log_setup.get_logger()

2. 核心算法模块 (core/)
core/algorithms.py
"""
核心定位算法模块
包含:卡尔曼滤波、加权最小二乘法、坐标转换、多源数据融合
这是本软件的核心知识产权部分
"""

import math
import numpy as np
from typing import Tuple, List, Dict, Optional
from dataclasses import dataclass
from config.settings import CONFIG
from config.logging_config import logger

@dataclass
class PositionData:
    """位置数据结构"""
    latitude: float
    longitude: float
    altitude: float
    accuracy: float
    timestamp: float
    source: str  # 'BDS', '5G', 'FUSED'
    hdop: float = 0.0
    satellites: int = 0

class KalmanFilter1D:
    """
    一维卡尔曼滤波器
    用于平滑单一维度(经度或纬度)的数据
    """
    def __init__(self, process_noise: float, measurement_noise: float):
        self.process_noise = process_noise
        self.measurement_noise = measurement_noise
        
        # 状态估计值
        self.x = 0.0
        # 误差协方差
        self.P = 1.0
        # 过程噪声协方差
        self.Q = process_noise
        # 测量噪声协方差
        self.R = measurement_noise
        # 控制输入矩阵 (此处为0)
        self.u = 0.0
        
        self.initialized = False

    def update(self, measurement: float) -> float:
        """
        更新滤波器状态并返回估计值
        :param measurement: 当前测量值
        :return: 滤波后的估计值
        """
        if not self.initialized:
            self.x = measurement
            self.initialized = True
            return measurement

        # 预测步骤
        # x_pred = x + u (此处u=0)
        x_pred = self.x
        P_pred = self.P + self.Q

        # 更新步骤
        # 卡尔曼增益 K = P_pred / (P_pred + R)
        K = P_pred / (P_pred + self.R)
        
        # 状态更新 x = x_pred + K * (z - x_pred)
        self.x = x_pred + K * (measurement - x_pred)
        
        # 协方差更新 P = (1 - K) * P_pred
        self.P = (1 - K) * P_pred

        return self.x

class KalmanFilter2D:
    """
    二维卡尔曼滤波器 (经纬度联合滤波)
    """
    def __init__(self, process_noise: float, measurement_noise: float):
        self.lat_filter = KalmanFilter1D(process_noise, measurement_noise)
        self.lon_filter = KalmanFilter1D(process_noise, measurement_noise)

    def update(self, lat: float, lon: float) -> Tuple[float, float]:
        """
        更新二维位置
        """
        smooth_lat = self.lat_filter.update(lat)
        smooth_lon = self.lon_filter.update(lon)
        return smooth_lat, smooth_lon

class CoordinateConverter:
    """
    坐标转换工具类
    支持 WGS84, GCJ02, BD09 转换 (模拟)
    以及 ECEF (地心地固坐标系) 与 ENU (站心坐标系) 转换
    """
    
    # 地球半径 (米)
    EARTH_RADIUS = 6378137.0
    
    @staticmethod
    def wgs84_to_ecef(lat: float, lon: float, alt: float) -> Tuple[float, float, float]:
        """
        将WGS84经纬高转换为ECEF直角坐标
        公式参考:GPS Theory and Practice
        """
        lat_rad = math.radians(lat)
        lon_rad = math.radians(lon)
        
        # 扁率
        f = 1 / 298.257223563
        e2 = 2 * f - f * f
        
        N = CoordinateConverter.EARTH_RADIUS / math.sqrt(1 - e2 * math.sin(lat_rad)**2)
        
        X = (N + alt) * math.cos(lat_rad) * math.cos(lon_rad)
        Y = (N + alt) * math.cos(lat_rad) * math.sin(lon_rad)
        Z = (N * (1 - e2) + alt) * math.sin(lat_rad)
        
        return X, Y, Z

    @staticmethod
    def ecef_to_enu(x: float, y: float, z: float, 
                    lat_ref: float, lon_ref: float, alt_ref: float) -> Tuple[float, float, float]:
        """
        将ECEF转换为以参考点为中心的ENU坐标
        用于计算相对距离
        """
        lat_ref_rad = math.radians(lat_ref)
        lon_ref_rad = math.radians(lon_ref)
        
        # 参考点ECEF
        x_ref, y_ref, z_ref = CoordinateConverter.wgs84_to_ecef(lat_ref, lon_ref, alt_ref)
        
        dx = x - x_ref
        dy = y - y_ref
        dz = z - z_ref
        
        # 旋转矩阵
        East = -math.sin(lon_ref_rad) * dx + math.cos(lon_ref_rad) * dy
        North = -math.sin(lat_ref_rad) * math.cos(lon_ref_rad) * dx \
                - math.sin(lat_ref_rad) * math.sin(lon_ref_rad) * dy \
                + math.cos(lat_ref_rad) * dz
        Up = math.cos(lat_ref_rad) * math.cos(lon_ref_rad) * dx \
             + math.cos(lat_ref_rad) * math.sin(lon_ref_rad) * dy \
             + math.sin(lat_ref_rad) * dz
             
        return East, North, Up

class FusionEngine:
    """
    5G+北斗多源数据融合引擎
    核心业务逻辑:根据信号质量动态调整权重
    """
    
    def __init__(self):
        self.kf = KalmanFilter2D(
            process_noise=CONFIG.algorithm.kf_process_noise,
            measurement_noise=CONFIG.algorithm.kf_measurement_noise
        )
        self.last_position: Optional[PositionData] = None

    def fuse(self, bds_data: Optional[Dict], five_g_data: Optional[Dict]) -> PositionData:
        """
        执行融合算法
        :param bds_data: 北斗原始数据字典
        :param five_g_data: 5G基站数据字典
        :return: 融合后的高精度位置对象
        """
        import time
        current_time = time.time()
        
        # 场景判断:室内/室外/遮挡
        is_outdoor = self._judge_environment(bds_data, five_g_data)
        
        target_lat = 0.0
        target_lon = 0.0
        source_tag = "UNKNOWN"
        accuracy = 999.0
        
        if is_outdoor and bds_data:
            # 室外优先使用北斗
            weight_bds = CONFIG.algorithm.bds_weight_outdoor
            weight_5g = 1.0 - weight_bds
            
            target_lat = bds_data['lat']
            target_lon = bds_data['lon']
            source_tag = "BDS_PRIMARY"
            accuracy = bds_data.get('hdop', 1.0) * 5.0 # 估算精度
            
            logger.debug(f"[融合] 室外模式,北斗权重: {weight_bds}")
            
        elif five_g_data and bds_data:
            # 混合模式 (城市峡谷/半室内)
            weight_bds = CONFIG.algorithm.bds_weight_indoor
            weight_5g = CONFIG.algorithm.five_g_weight_indoor
            
            # 加权平均
            target_lat = weight_bds * bds_data['lat'] + weight_5g * five_g_data['lat']
            target_lon = weight_bds * bds_data['lon'] + weight_5g * five_g_data['lon']
            source_tag = "FUSED_HYBRID"
            accuracy = 5.0 # 估算融合精度
            
            logger.debug(f"[融合] 混合模式,北斗:{weight_bds}, 5G:{weight_5g}")
            
        elif five_g_data:
            # 纯5G定位 (深室内)
            target_lat = five_g_data['lat']
            target_lon = five_g_data['lon']
            source_tag = "5G_ONLY"
            accuracy = 10.0 # 5G定位精度较低
            
        else:
            # 无信号,保持上一次位置或返回空
            if self.last_position:
                logger.warning("[融合] 无信号,使用上次位置保持")
                return self.last_position
            raise ValueError("No valid positioning data available")

        # 应用卡尔曼滤波平滑
        smooth_lat, smooth_lon = self.kf.update(target_lat, target_lon)
        
        result = PositionData(
            latitude=smooth_lat,
            longitude=smooth_lon,
            altitude=bds_data.get('alt', 0.0) if bds_data else 0.0,
            accuracy=accuracy,
            timestamp=current_time,
            source=source_tag,
            hdop=bds_data.get('hdop', 0.0) if bds_data else 0.0,
            satellites=bds_data.get('sats', 0) if bds_data else 0
        )
        
        self.last_position = result
        return result

    def _judge_environment(self, bds: Optional[Dict], five_g: Optional[Dict]) -> bool:
        """
        简单的环境判断逻辑
        如果北斗卫星数 > 8 且 HDOP < 2.5,认为是室外
        """
        if bds and bds.get('sats', 0) > 8 and bds.get('hdop', 10.0) < 2.5:
            return True
        return False
core/geofence.py
"""
电子围栏管理模块
功能:多边形围栏、圆形围栏、进出报警、滞留报警
"""

import math
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
from config.logging_config import logger

class AlertType(Enum):
    ENTER = "ENTER_ZONE"
    LEAVE = "LEAVE_ZONE"
    STAY = "STAY_TOO_LONG"
    SOS = "SOS_TRIGGERED"

@dataclass
class GeoPoint:
    lat: float
    lon: float

class GeofenceManager:
    """
    电子围栏管理器
    支持多种几何形状的围栏检测
    """
    
    def __init__(self):
        # 存储围栏数据: {fence_id: {'type': 'circle'|'poly', 'data': ..., 'alert_config': ...}}
        self.fences = {}
        # 人员状态追踪: {user_id: {'last_status': {}, 'enter_time': ...}}
        self.user_states = {}

    def add_circle_fence(self, fence_id: str, center: GeoPoint, radius: float, name: str = ""):
        """添加圆形围栏"""
        self.fences[fence_id] = {
            'type': 'circle',
            'center': center,
            'radius': radius,
            'name': name or f"Circle_{fence_id}",
            'alert_config': {'on_enter': True, 'on_leave': True}
        }
        logger.info(f"添加圆形围栏: {name} (ID: {fence_id})")

    def add_polygon_fence(self, fence_id: str, points: List[GeoPoint], name: str = ""):
        """添加多边形围栏"""
        self.fences[fence_id] = {
            'type': 'polygon',
            'points': points,
            'name': name or f"Poly_{fence_id}",
            'alert_config': {'on_enter': True, 'on_leave': True}
        }
        logger.info(f"添加多边形围栏: {name} (ID: {fence_id})")

    def check_point(self, user_id: str, lat: float, lon: float, timestamp: float) -> List[dict]:
        """
        检查某点是否在围栏内,并生成报警事件
        """
        alerts = []
        current_point = GeoPoint(lat, lon)
        
        if user_id not in self.user_states:
            self.user_states[user_id] = {
                'inside_fences': set(),
                'enter_times': {}
            }
        
        state = self.user_states[user_id]
        currently_inside = set()

        for fid, fence in self.fences.items():
            is_inside = False
            
            if fence['type'] == 'circle':
                is_inside = self._is_in_circle(current_point, fence['center'], fence['radius'])
            elif fence['type'] == 'polygon':
                is_inside = self._is_in_polygon(current_point, fence['points'])
            
            if is_inside:
                currently_inside.add(fid)
                
                # 进入检测
                if fid not in state['inside_fences']:
                    if fence['alert_config']['on_enter']:
                        alerts.append({
                            'type': AlertType.ENTER.value,
                            'user_id': user_id,
                            'fence_id': fid,
                            'fence_name': fence['name'],
                            'timestamp': timestamp,
                            'location': (lat, lon)
                        })
                        logger.warning(f"报警: 用户 {user_id} 进入围栏 {fence['name']}")
                    
                    state['enter_times'][fid] = timestamp
                
                # 滞留检测 (简单示例:超过300秒)
                enter_time = state['enter_times'].get(fid)
                if enter_time and (timestamp - enter_time) > 300:
                     if fence['alert_config'].get('on_stay', False): # 需配置开启
                        alerts.append({
                            'type': AlertType.STAY.value,
                            'user_id': user_id,
                            'fence_id': fid,
                            'duration': timestamp - enter_time,
                            'timestamp': timestamp
                        })
            else:
                # 离开检测
                if fid in state['inside_fences']:
                    if fence['alert_config']['on_leave']:
                        alerts.append({
                            'type': AlertType.LEAVE.value,
                            'user_id': user_id,
                            'fence_id': fid,
                            'fence_name': fence['name'],
                            'timestamp': timestamp,
                            'location': (lat, lon)
                        })
                        logger.warning(f"报警: 用户 {user_id} 离开围栏 {fence['name']}")
                    if fid in state['enter_times']:
                        del state['enter_times'][fid]

        # 更新状态
        left_fences = state['inside_fences'] - currently_inside
        state['inside_fences'] = currently_inside
        
        return alerts

    def _is_in_circle(self, point: GeoPoint, center: GeoPoint, radius: float) -> bool:
        """判断点是否在圆内 (Haversine公式计算距离)"""
        R = 6371000  # 地球半径 米
        dLat = math.radians(point.lat - center.lat)
        dLon = math.radians(point.lon - center.lon)
        a = math.sin(dLat/2) * math.sin(dLat/2) + \
            math.cos(math.radians(center.lat)) * math.cos(math.radians(point.lat)) * \
            math.sin(dLon/2) * math.sin(dLon/2)
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
        distance = R * c
        return distance <= radius

    def _is_in_polygon(self, point: GeoPoint, polygon: List[GeoPoint]) -> bool:
        """射线法判断点是否在多边形内"""
        x, y = point.lon, point.lat
        inside = False
        n = len(polygon)
        
        p1x, p1y = polygon[0].lon, polygon[0].lat
        for i in range(n + 1):
            p2x, p2y = polygon[i % n].lon, polygon[i % n].lat
            if y > min(p1y, p2y):
                if y <= max(p1y, p2y):
                    if x <= max(p1x, p2x):
                        if p1y != p2y:
                            xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
                        if p1x == p2x or x <= xinters:
                            inside = not inside
            p1x, p1y = p2x, p2y
        return inside

3. 硬件驱动层 (hardware/)
hardware/arm_driver.py
"""
定制ARM架构硬件抽象层 (HAL)
模拟与底层GPIO、UART、I2C的交互
在实际部署中,此处将替换为真实的 ctypes 调用或 wiringPi 库
"""

import time
import random
import threading
from typing import Optional, Dict
from config.settings import CONFIG
from config.logging_config import logger

class ARMGPIO:
    """模拟ARM GPIO操作"""
    def __init__(self):
        self.pin_states = {}
        logger.info("ARM GPIO 控制器初始化完成")

    def setup(self, pin: int, mode: str):
        """设置引脚模式"""
        self.pin_states[pin] = {'mode': mode, 'value': 0}
        logger.debug(f"GPIO Pin {pin} 设置为 {mode}")

    def output(self, pin: int, value: int):
        """输出高低电平"""
        if pin in self.pin_states:
            self.pin_states[pin]['value'] = value
            # 模拟硬件延迟
            time.sleep(0.001) 
        else:
            logger.error(f"GPIO Pin {pin} 未初始化")

    def input(self, pin: int) -> int:
        """读取引脚状态 (模拟SOS按钮)"""
        if pin == CONFIG.hardware.sos_button_pin:
            # 模拟随机按下SOS按钮 (概率 0.1%)
            return 1 if random.random() < 0.001 else 0
        return 0

class SerialPortSimulator:
    """模拟串口通信 (UART)"""
    def __init__(self, port: str, baudrate: int):
        self.port = port
        self.baudrate = baudrate
        self.is_open = False
        self.buffer = []
        logger.info(f"串口 {port} (@{baudrate}) 模拟器初始化")

    def open(self):
        self.is_open = True
        logger.debug(f"串口 {self.port} 已打开")

    def close(self):
        self.is_open = False
        logger.debug(f"串口 {self.port} 已关闭")

    def read_line(self) -> Optional[str]:
        """模拟读取一行数据"""
        if not self.is_open:
            return None
        
        # 模拟不同模块的数据返回
        if "ttyUSB0" in self.port: # GNSS
            return self._generate_nmea()
        elif "ttyUSB2" in self.port: # 5G
            return self._generate_5g_data()
        return None

    def _generate_nmea(self) -> str:
        """生成模拟的NMEA-0183句子"""
        # 随机微调坐标以模拟移动
        base_lat = 39.9042 + random.uniform(-0.0001, 0.0001)
        base_lon = 116.4074 + random.uniform(-0.0001, 0.0001)
        sats = random.randint(8, 14)
        hdop = random.uniform(0.8, 1.5)
        
        # GNGGA 格式
        return f"$GNGGA,120000.000,{base_lat*100:.4f},N,{base_lon*100:.4f},E,1,{sats},{hdop:.1f},50.0,M,0.0,M,,*FF"

    def _generate_5g_data(self) -> str:
        """生成模拟的5G模组AT响应或JSON数据"""
        rssi = random.randint(-90, -60)
        cell_id = f"5G_N78_{random.randint(100, 999)}"
        return f'{{"rssi": {rssi}, "cell_id": "{cell_id}", "ta": {random.randint(1, 20)}}}'

class HardwareManager:
    """
    硬件总控类
    统一管理GPIO和串口
    """
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(HardwareManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return
        
        self.gpio = ARMGPIO()
        self.gnss_serial = SerialPortSimulator(CONFIG.hardware.gnss_port, CONFIG.hardware.gnss_baudrate)
        self.five_g_serial = SerialPortSimulator(CONFIG.hardware.five_g_port, CONFIG.hardware.five_g_baudrate)
        
        self._init_hardware()
        self._initialized = True

    def _init_hardware(self):
        """初始化所有硬件接口"""
        try:
            # 初始化GPIO
            self.gpio.setup(CONFIG.hardware.led_status_pin, "OUTPUT")
            self.gpio.setup(CONFIG.hardware.sos_button_pin, "INPUT")
            
            # 打开串口
            self.gnss_serial.open()
            self.five_g_serial.open()
            
            self.gpio.output(CONFIG.hardware.led_status_pin, 1) # 点亮状态灯
            logger.info("✅ 定制ARM硬件初始化成功")
        except Exception as e:
            logger.error(f"❌ 硬件初始化失败: {e}")
            raise

    def read_gnss_data(self) -> Optional[str]:
        return self.gnss_serial.read_line()

    def read_5g_data(self) -> Optional[str]:
        return self.five_g_serial.read_line()

    def check_sos(self) -> bool:
        """检查SOS按钮是否被按下"""
        val = self.gpio.input(CONFIG.hardware.sos_button_pin)
        if val == 1:
            logger.critical("🚨 SOS 按钮被触发!")
            self.gpio.output(CONFIG.hardware.led_status_pin, 1) # 快闪模拟 (简化为常亮)
            return True
        return False

    def shutdown(self):
        """安全关闭硬件"""
        self.gpio.output(CONFIG.hardware.led_status_pin, 0)
        self.gnss_serial.close()
        self.five_g_serial.close()
        logger.info("硬件资源已释放")

4. 数据模型与持久化 (models/)
models/database.py
"""
数据库连接与管理模块
支持 SQLite (嵌入式) 和 MySQL (服务器端)
使用 SQLAlchemy ORM 风格 (简化版实现)
"""

import sqlite3
import json
import threading
from typing import List, Dict, Any, Optional
from contextlib import contextmanager
from config.settings import CONFIG
from config.logging_config import logger

class DatabaseManager:
    """单例数据库管理器"""
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(DatabaseManager, cls).__new__(cls)
                cls._instance._init_db()
            return cls._instance

    def _init_db(self):
        """初始化数据库连接和表结构"""
        self.db_path = CONFIG.database.connection_string.replace("sqlite:///", "")
        self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
        self.conn.row_factory = sqlite3.Row
        self._create_tables()
        logger.info(f"数据库初始化完成: {self.db_path}")

    def _create_tables(self):
        """创建必要的表结构"""
        cursor = self.conn.cursor()
        
        # 位置记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS location_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT NOT NULL,
                timestamp REAL NOT NULL,
                latitude REAL NOT NULL,
                longitude REAL NOT NULL,
                altitude REAL,
                accuracy REAL,
                source_type TEXT,
                satellites INTEGER,
                hdop REAL,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 报警记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alert_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT NOT NULL,
                alert_type TEXT NOT NULL,
                fence_id TEXT,
                fence_name TEXT,
                location_data TEXT, -- JSON格式
                timestamp REAL NOT NULL,
                handled BOOLEAN DEFAULT 0,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 设备状态表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS device_status (
                device_id TEXT PRIMARY KEY,
                battery_level INTEGER,
                signal_strength INTEGER,
                last_heartbeat REAL,
                status TEXT
            )
        ''')
        
        self.conn.commit()
        logger.debug("数据库表结构检查完毕")

    @contextmanager
    def get_cursor(self):
        """获取游标上下文管理器"""
        cursor = self.conn.cursor()
        try:
            yield cursor
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            logger.error(f"数据库事务错误: {e}")
            raise
        finally:
            cursor.close()

    def save_location(self, user_id: str, pos_data: Dict):
        """保存位置数据"""
        with self.get_cursor() as cursor:
            cursor.execute('''
                INSERT INTO location_logs (user_id, timestamp, latitude, longitude, altitude, accuracy, source_type, satellites, hdop)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                user_id,
                pos_data['timestamp'],
                pos_data['latitude'],
                pos_data['longitude'],
                pos_data.get('altitude', 0),
                pos_data.get('accuracy', 0),
                pos_data.get('source', 'UNKNOWN'),
                pos_data.get('satellites', 0),
                pos_data.get('hdop', 0)
            ))

    def save_alert(self, user_id: str, alert_data: Dict):
        """保存报警数据"""
        with self.get_cursor() as cursor:
            cursor.execute('''
                INSERT INTO alert_logs (user_id, alert_type, fence_id, fence_name, location_data, timestamp)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                user_id,
                alert_data['type'],
                alert_data.get('fence_id'),
                alert_data.get('fence_name'),
                json.dumps(alert_data.get('location', {})),
                alert_data['timestamp']
            ))

    def get_recent_locations(self, user_id: str, limit: int = 100) -> List[Dict]:
        """查询最近的位置记录"""
        with self.get_cursor() as cursor:
            cursor.execute('''
                SELECT * FROM location_logs WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?
            ''', (user_id, limit))
            rows = cursor.fetchall()
            return [dict(row) for row in rows]

    def close(self):
        """关闭连接"""
        if self.conn:
            self.conn.close()
            logger.info("数据库连接已关闭")

5. 服务层与主程序 (services/, main.py)
services/location_service.py
"""
定位业务服务层
协调硬件、算法和数据库
"""

import time
import threading
import json
from typing import Optional, Dict
from hardware.arm_driver import HardwareManager
from core.algorithms import FusionEngine, PositionData
from core.geofence import GeofenceManager, GeoPoint
from models.database import DatabaseManager
from config.settings import CONFIG
from config.logging_config import logger

class LocationService:
    """定位核心服务"""
    
    def __init__(self, user_id: str = "USER_001"):
        self.user_id = user_id
        self.hw = HardwareManager()
        self.fusion_engine = FusionEngine()
        self.db = DatabaseManager()
        self.geo_manager = GeofenceManager()
        
        self._setup_default_fences()
        self.running = False
        self.thread = None

    def _setup_default_fences(self):
        """初始化默认电子围栏"""
        # 办公区
        self.geo_manager.add_circle_fence(
            "FENCE_OFFICE", 
            GeoPoint(39.9042, 116.4074), 
            50.0, 
            "办公区域"
        )
        # 危险区
        self.geo_manager.add_circle_fence(
            "FENCE_DANGER", 
            GeoPoint(39.9050, 116.4080), 
            30.0, 
            "高危作业区"
        )

    def start(self):
        """启动定位服务线程"""
        if self.running:
            logger.warning("服务已在运行中")
            return
        
        self.running = True
        self.thread = threading.Thread(target=self._loop, daemon=True)
        self.thread.start()
        logger.info(f"🚀 定位服务已启动 (用户: {self.user_id})")

    def stop(self):
        """停止服务"""
        self.running = False
        if self.thread:
            self.thread.join()
        logger.info("⏹️ 定位服务已停止")

    def _loop(self):
        """主循环"""
        interval = 1.0 / CONFIG.hardware.sampling_rate_hz
        
        while self.running:
            start_time = time.time()
            
            try:
                # 1. 采集数据
                raw_gnss = self.hw.read_gnss_data()
                raw_5g = self.hw.read_5g_data()
                sos_triggered = self.hw.check_sos()
                
                # 2. 解析数据
                bds_dict = self._parse_gnss(raw_gnss) if raw_gnss else None
                five_g_dict = self._parse_5g(raw_5g) if raw_5g else None
                
                # 3. 融合解算
                if bds_dict or five_g_dict:
                    pos_data = self.fusion_engine.fuse(bds_dict, five_g_dict)
                    
                    # 4. 电子围栏检测
                    alerts = self.geo_manager.check_point(
                        self.user_id, 
                        pos_data.latitude, 
                        pos_data.longitude, 
                        pos_data.timestamp
                    )
                    
                    # 5. 数据处理
                    self._process_result(pos_data, alerts, sos_triggered)
                else:
                    logger.warning("未获取到有效定位数据")
                
            except Exception as e:
                logger.error(f"定位循环异常: {e}", exc_info=True)
            
            # 控制频率
            elapsed = time.time() - start_time
            sleep_time = max(0, interval - elapsed)
            time.sleep(sleep_time)

    def _parse_gnss(self, nmea: str) -> Optional[Dict]:
        """解析NMEA字符串"""
        if "GNGGA" not in nmea:
            return None
        parts = nmea.split(',')
        # 简化解析逻辑
        lat_deg = float(parts[2][:2])
        lat_min = float(parts[2][2:])
        lat = lat_deg + lat_min / 60.0
        if parts[3] == 'S': lat = -lat
        
        lon_deg = float(parts[4][:3])
        lon_min = float(parts[4][3:])
        lon = lon_deg + lon_min / 60.0
        if parts[5] == 'W': lon = -lon
        
        return {
            'lat': lat,
            'lon': lon,
            'alt': float(parts[9]),
            'sats': int(parts[7]),
            'hdop': float(parts[8])
        }

    def _parse_5g(self, json_str: str) -> Optional[Dict]:
        """解析5G数据"""
        try:
            data = json.loads(json_str)
            # 模拟将基站ID转换为坐标 (实际需查表)
            # 这里简单偏移一点
            return {
                'lat': 39.9042 + 0.0001,
                'lon': 116.4074 + 0.0001,
                'rssi': data.get('rssi')
            }
        except:
            return None

    def _process_result(self, pos: PositionData, alerts: list, sos: bool):
        """处理最终结果"""
        # 存库
        pos_dict = {
            'timestamp': pos.timestamp,
            'latitude': pos.latitude,
            'longitude': pos.longitude,
            'altitude': pos.altitude,
            'accuracy': pos.accuracy,
            'source': pos.source,
            'satellites': pos.satellites,
            'hdop': pos.hdop
        }
        self.db.save_location(self.user_id, pos_dict)
        
        # 处理报警
        if sos:
            alert_data = {'type': 'SOS', 'timestamp': time.time()}
            self.db.save_alert(self.user_id, alert_data)
            logger.critical("!!! SOS 报警已记录 !!!")
            
        for alert in alerts:
            self.db.save_alert(self.user_id, alert)
            # 此处可添加发送MQTT/HTTP通知逻辑
            
        # 打印调试信息
        logger.info(f"Pos: [{pos.source}] {pos.latitude:.6f}, {pos.longitude:.6f} Acc: {pos.accuracy:.2f}m")

# 全局服务实例
service_instance = None

def get_service(user_id: str = "USER_001") -> LocationService:
    global service_instance
    if service_instance is None:
        service_instance = LocationService(user_id)
    return service_instance
main.py (程序入口)
"""
基于5G+北斗的人员定位系统 - 主入口
Copyright (c) 2026. All Rights Reserved.
"""

import sys
import signal
import time
from config.logging_config import logger
from services.location_service import get_service
from models.database import DatabaseManager
from config.settings import CONFIG

def signal_handler(sig, frame):
    """处理退出信号"""
    logger.info("接收到退出信号,正在关闭系统...")
    svc = get_service()
    svc.stop()
    db = DatabaseManager()
    db.close()
    logger.info("系统已安全退出。")
    sys.exit(0)

def main():
    """主函数"""
    logger.info("="*50)
    logger.info(f"启动 {CONFIG.app_name} v{CONFIG.version}")
    logger.info(f"运行平台: 定制ARM架构 (Linux)")
    logger.info(f"Python版本: {sys.version}")
    logger.info("="*50)

    # 注册信号处理
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        # 获取并启动服务
        service = get_service("WORKER_01")
        service.start()
        
        # 主线程保持运行
        while True:
            time.sleep(1)
            
    except Exception as e:
        logger.critical(f"系统发生严重错误: {e}", exc_info=True)
        sys.exit(1)

if __name__ == "__main__":
    main()

6. 测试用例 (tests/)
tests/test_algorithms.py
"""
单元测试:核心算法模块
验证卡尔曼滤波和融合逻辑的正确性
"""

import unittest
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from core.algorithms import KalmanFilter1D, FusionEngine, PositionData

class TestKalmanFilter(unittest.TestCase):
    def test_smoothing(self):
        """测试滤波器的平滑效果"""
        kf = KalmanFilter1D(process_noise=0.001, measurement_noise=0.1)
        
        # 模拟带有噪声的测量值
        measurements = [10.0, 10.5, 9.8, 10.2, 10.1, 20.0, 10.0] # 20.0是异常值
        results = []
        
        for m in measurements:
            res = kf.update(m)
            results.append(res)
        
        # 断言:最后一个值应该被拉回接近10,而不是20
        self.assertLess(results[-1], 15.0)
        print(f"滤波前: {measurements}")
        print(f"滤波后: {results}")

class TestFusionEngine(unittest.TestCase):
    def test_outdoor_priority(self):
        """测试室外环境下北斗优先"""
        engine = FusionEngine()
        
        bds_data = {'lat': 39.9, 'lon': 116.4, 'sats': 12, 'hdop': 1.0, 'alt': 50}
        five_g_data = {'lat': 39.91, 'lon': 116.41, 'rssi': -70} # 偏差较大
        
        result = engine.fuse(bds_data, five_g_data)
        
        self.assertEqual(result.source, "BDS_PRIMARY")
        # 结果应更接近北斗数据
        self.assertAlmostEqual(result.latitude, 39.9, delta=0.005)

if __name__ == '__main__':
    unittest.main()

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区