广电开始有电信业务了,号码归属地表又是一顿更新,如果跟不上步伐可能会引发一些事故。目前暂时本人暂时无法做到自己去爬数据,所以就只能借助网上大佬的数据库来使用了。
找了一圈,找到了lovedboy大佬的github库。以及相对应的脚本。原本的使用方法应该是数据文件+脚本形成可以即时调用的方法,但无奈自家是写进mysql的,只能无奈将其导出。

修改了大佬的原调用脚本,测试失败。😅
索性摆烂,将大佬脚本和需求丢进chatgpt跑一圈,出来以下脚本,试运行,丝滑。😲
放出来给大家看看。

# -*- coding: utf-8 -*-

import os
import struct
import sys

__author__ = 'lovedboy'

if sys.version_info > (3, 0):
    def get_record_content(buf, start_offset):
        end_offset = buf.find(b'\x00', start_offset)
        return buf[start_offset:end_offset].decode()
else:
    def get_record_content(buf, start_offset):
        end_offset = buf.find('\x00', start_offset)
        return buf[start_offset:end_offset]

class Phone(object):
    def __init__(self, dat_file=None):
        if dat_file is None:
            dat_file = os.path.join(os.path.dirname(__file__), "phone.dat")

        with open(dat_file, 'rb') as f:
            self.buf = f.read()

        self.head_fmt = "<4si"
        self.phone_fmt = "<iiB"
        self.head_fmt_length = struct.calcsize(self.head_fmt)
        self.phone_fmt_length = struct.calcsize(self.phone_fmt)
        self.version, self.first_phone_record_offset = struct.unpack(
            self.head_fmt, self.buf[:self.head_fmt_length])
        self.phone_record_count = (len(self.buf) - self.first_phone_record_offset) // self.phone_fmt_length

    def get_phone_dat_msg(self):
        print("版本号:{}".format(self.version))
        print("总记录条数:{}".format(self.phone_record_count))

    @staticmethod
    def get_phone_no_type(no):
        if no == 4:
            return "电信虚拟运营商"
        if no == 5:
            return "联通虚拟运营商"
        if no == 6:
            return "移动虚拟运营商"
        if no == 3:
            return "电信"
        if no == 2:
            return "联通"
        if no == 1:
            return "移动"

    @staticmethod
    def _format_phone_content(phone_num, record_content, phone_type):
        province, city, zip_code, area_code = record_content.split('|')
        return {
            "phone": phone_num,
            "province": province,
            "city": city,
            "zip_code": zip_code,
            "area_code": area_code,
            "phone_type": Phone.get_phone_no_type(phone_type)
        }

    def _parse_record(self, record):
        phone_num, record_offset, phone_type = struct.unpack(self.phone_fmt, record)
        record_content = get_record_content(self.buf, record_offset)
        return Phone._format_phone_content(phone_num, record_content, phone_type)

    def all(self):
        records = []
        buflen = len(self.buf)
        for i in range(self.first_phone_record_offset, buflen, self.phone_fmt_length):
            record = self.buf[i: i + self.phone_fmt_length]
            records.append(self._parse_record(record))
        return records

    def test(self):
        self.get_phone_dat_msg()
        records = self.all()
        for record in records:
            print(self.human_phone_info(record))

    @staticmethod
    def human_phone_info(phone_info):
        if not phone_info:
            return ''

        return "{}|{}|{}|{}|{}|{}".format(phone_info['phone'],
                                          phone_info['province'],
                                          phone_info['city'],
                                          phone_info['zip_code'],
                                          phone_info['area_code'],
                                          phone_info['phone_type'])

if __name__ == "__main__":
    phone = Phone()
    phone.test()