• Python爬虫 爬取下载美国科研网站数据


    目录

    任务简介

    任务解决思路与经验收获

    具体步骤

    总结

                            我是政胤 期待你的关注


    大家好 我是政胤 今天教大家一个比较刑的爬虫案例

    Python爬虫 爬取下载美国科研网站数据

    制作不易 点个免费的关注 支持一下吧

    任务简介

    • 目标网站:https://app.powerbigov.us/view?r=eyJrIjoiYWEx...

    • 目标数据:下载2009-2013年的表格数据,并存储为CSV文件

        目标网站是漂亮国的科研数据,是PowerBI实现的网页数据,无法使用Ctrl+C复制内容,因此,求助于我们进行爬取。


    任务解决思路与经验收获

      首先任务可拆解为两个部分:一是从网站爬取数据到本地,二是解析数据输出CSV文件

    • 爬取数据部分:

      1. 解析网页,找到数据异步加载的实际请求地址与参数

      2. 书写爬虫代码获取全部数据

    • 解析数据部分

      这是本次任务的主要难点所在,难点在于:在返回的数据list中,元素不是固定的个数,只有与上一行不同的数值,而具体哪一列不同、哪一列相同,是使用一个“R”值表示,正常解决思路是要通过JS逆向,找出解析R关系的函数,完成解析。但是,由于网页的JS非常复杂,且许多函数名都是简写,阅读十分困难,一直没有逆向成功

      在解决该问题上,先是手工查询总结关系,完成了第一个版本,没想到后续在写写这篇分享文章时突然思路打开,改变了请求数据方式,绕过了分析R关系的步骤:

    • 方案一:按正常请求,使用R关系解析数据

      下载完整的数据后分析,所需要的2009至2013年的数据中,R关系一共有124种,最小值0、最大值4083,通过人工查询这124种关系,制作成字典,完成解析。总结出的关系如下图(手工查询了5个小时,累啊):

    • 方案二:以时间换空间,每次仅请求一行数据,绕过解析R关系的难题
        在复盘时,突然头脑开窍,请求到的数据第一行一定是完整的,要是每次只请求一行数据,那就不可能存在与上一行相同的情形了,这种情况下就能绕过解析R关系这一难题。测试后方案可行,只是需要考虑以下问题:

      1. 开启多线程加速以缩时间,但即使开启多线程,也只能按12个年份开启12个线程,而行数最多的年份约2万行,爬虫需要运行约5至6个小时

      2. 断点续爬,避免程序异常中断后,需要从头开始;


    具体步骤

    1. 目标网站分析

      第一步当然是对目标网站进行分析,找到数据正确的请求地址,这点很容易,打开Chrome的开发者模式,向下拖动滚动条,看到新出现的请求,就是真实的地址了,直接上结果:

    然后看一下POST请求的参数

    请求的参数

    再看一下Headers,意外发现,竟然没有反爬!没有反爬!没有反爬!好吧,漂亮国的网站就是大气。

    • 分析小结:
      1. # 完整参数就略过,关键参数以下三项:
      2. # 1.筛选年份的参数,示例:
      3. param['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Query']['Where'][0]['Condition']['In']['Values'][0][0]['Literal']['Value'= '2009L'
      4. # 2.请求下一批数据(请求首批数据时无需传入该参数),示例:
      5. param['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Binding']['DataReduction']['Primary']['Window']['RestartTokens'= [["'Augusto E Caballero-Robles'","'Physician'","'159984'","'Daiichi Sankyo, Inc.'","'CC0131'","'Basking Ridge'","'NJ'","'Compensation for Bona Fide Services'","2009L","'4753'"]]
      6. # 注:以上"RestartTokens"的值在前一批数据的response中,为上一批数据的返回字典值,示例res['results'][0]['result']['data']['dsr']['DS'][0]['RT']
      7. # 3.请求页面的行数(浏览器访问默认是500/页,但爬虫访问的话...你懂的),示例:
      8. param['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Binding']['DataReduction']['Primary']['Window']['Count'= 500
      9. # 参数还有很多,例如排序的参数ordby,各种筛选项等
      • 请求数据的网址:https://wabi-us-gov-virginia-api.analysis.usgovcloudapi.net/public/reports/querydata?synchronous=true

      • POST的关键参数:

    1. 爬虫主要步骤及代码

      • 在获取全部数据上,则使用一个while True死循环,每次请求返回值中如有"RT"关键字,则修改POST参数,发起下一个请求,直至返回值中没有"RT"关键字,代表全部数据爬取结束(详见代码)

      • 请求过程中出现的异常需要捕获,根据异常类型决定下一步操作,由于该网站没有反爬,只有超时或连接错误的异常,因此,只需要重启发起请求即可,因此可以不考虑断点续爬

      • 以上步骤,详细代码如下:

      • 网站没有反爬,找到正确的路径和参数后,在爬虫代码实现上相对简单,直接发起post请求即可,代码中通过PageSpider类实现(详细代码附后)

      • 在断点续传上,通过流程解决,把每行数据存储到TXT文件中,文件名记录年份以及行数,先读取已爬取的记录,找到最后一次请求结果,然后发起后续请求。

        1. """
        2. 爬取页面数据的爬虫
        3. """
        4. import pathlib as pl
        5. import requests
        6. import json
        7. import time
        8. import threading
        9. import urllib3
        10. def get_cost_time(start: time.time, end: time.time = None):
        11. """
        12. 计算间隔时长的方法
        13. :param start: 起始时间
        14. :param end: 结束时间,默认为空,按最新时间计算
        15. :return: 时分秒格式
        16. """
        17. if not end:
        18. end = time.time()
        19. cost = end - start
        20. days = int(cost / 86400)
        21. hours = int(cost % 86400 / 3600)
        22. mins = int(cost % 3600 / 60)
        23. secs = round(cost % 60, 4)
        24. text = ''
        25. if days:
        26. text = f'{text}{days}天'
        27. if hours:
        28. text = f'{text}{hours}小时'
        29. if mins:
        30. text = f'{text}{mins}分钟'
        31. if secs:
        32. text = f'{text}{secs}秒'
        33. return text
        34. class PageSpider:
        35. def __init__(self, year: int, nrows: int = 500, timeout: int = 30):
        36. """
        37. 初始化爬虫的参数
        38. :param year: 下载数据的年份,默认空,不筛选年份,取得全量数据
        39. :param nrows: 每次请求获取的数据行数,默认500,最大30000(服务器自动限制,超过无效)
        40. :param timeout: 超时等待时长
        41. """
        42. self.year = year if year else 'all'
        43. self.timeout = timeout
        44. # 请求数据的地址
        45. self.url = 'https://wabi-us-gov-virginia-api.analysis.usgovcloudapi.net/public/reports/querydata?synchronous=true'
        46. # 请求头
        47. self.headers = {
        48. # 太长省略,自行在浏览器中复制
        49. }
        50. # 默认参数
        51. self.params = {
        52. # 太长省略,自行在浏览器中复制
        53. }
        54. # 修改默认参数中的每次请求的行数
        55. self.params['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Binding']['DataReduction'][
        56. 'Primary']['Window']['Count'] = nrows
        57. # 修改默认参数中请求的年份
        58. if self.year != 'all':
        59. self.params['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Query']['Where'][0][
        60. 'Condition']['In']['Values'][0][0]['Literal']['Value'] = f'{year}L'
        61. @classmethod
        62. def read_json(cls, file_path: pl.Path):
        63. with open(file_path, 'r', encoding='utf-8') as fin:
        64. res = json.loads(fin.read())
        65. return res
        66. def get_idx_and_rt(self):
        67. """
        68. 获取已经爬取过的信息,最大的idx以及请求下一页的参数
        69. """
        70. single = True
        71. tmp_path = pl.Path('./tmp/')
        72. if not tmp_path.is_dir():
        73. tmp_path.mkdir()
        74. files = list(tmp_path.glob(f'{self.year}_part*.txt'))
        75. if files:
        76. idx = max([int(filename.stem.replace(f'{self.year}_part', '')) for filename in files])
        77. res = self.read_json(tmp_path / f'{self.year}_part{idx}.txt')
        78. key = res['results'][0]['result']['data']['dsr']['DS'][0].get('RT')
        79. if not key:
        80. single = False
        81. else:
        82. idx = 0
        83. key = None
        84. return idx, key, single
        85. def make_params(self, key: list = None) -> dict:
        86. """
        87. 制作请求体中的参数
        88. :param key: 下一页的关键字RestartTokens,默认空,第一次请求时无需传入该参数
        89. :return: dict
        90. """
        91. params = self.params.copy()
        92. if key:
        93. params['queries'][0]['Query']['Commands'][0]['SemanticQueryDataShapeCommand']['Binding']['DataReduction'][
        94. 'Primary']['Window']['RestartTokens'] = key
        95. return params
        96. def crawl_pages(self, idx: int = 1, key: list = None):
        97. """
        98. 爬取页面并输出TXT文件的方法,
        99. :param idx: 爬取的索引值,默认为1,在每行爬取时,代表行数
        100. :param key: 下一页的关键字RestartTokens,默认空,第一次请求时无需传入该参数
        101. :return: None
        102. """
        103. start = time.time()
        104. while True: # 创建死循环爬取直至结束
        105. try:
        106. res = requests.post(url=self.url, headers=self.headers, json=self.make_params(key),
        107. timeout=self.timeout)
        108. except (
        109. requests.exceptions.ConnectTimeout,
        110. requests.exceptions.ConnectionError,
        111. urllib3.exceptions.ConnectionError,
        112. urllib3.exceptions.ConnectTimeoutError
        113. ): # 捕获超时异常 或 连接异常
        114. print(f'{self.year}_part{idx}: timeout, wait 5 seconds retry')
        115. time.sleep(5) # 休息5秒后再次请求
        116. continue # 跳过后续步骤
        117. except Exception as e: # 其他异常,打印一下异常信息
        118. print(f'{self.year}_part{idx} Error: {e}')
        119. time.sleep(5) # 休息5秒后再次请求
        120. continue # 跳过后续步骤
        121. if res.status_code == 200:
        122. with open(f'./tmp/{self.year}_part{idx}.txt', 'w', encoding='utf-8') as fout:
        123. fout.write(res.text)
        124. if idx % 100 == 0:
        125. print(f'{self.year}的第{idx}行数据写入完成,已用时: {get_cost_time(start)}')
        126. key = json.loads(res.text)['results'][0]['result']['data']['dsr']['DS'][0].get('RT', None)
        127. if not key: # 如果没有RT值,说明已经全部爬取完毕了,打印一下信息退出
        128. print(f'{self.year} completed max_idx is {idx}')
        129. return
        130. idx += 1
        131. else: # 打印一下信息重新请求
        132. print(f'{self.year}_part{idx} not 200,check please', res.text)
        133. continue
        134. def mul_crawl(year: int, nrows: int = 2):
        135. """
        136. 多线程爬取的方法,注按行爬取
        137. :param year: 需要爬取的年份
        138. :param nrows: 每份爬取的行数,若每次仅爬取1行数据,nrows参数需要为2,才会有下一行,否则都是第一行
        139. """
        140. # 定义爬虫对象
        141. spider = PageSpider(year, nrows=nrows)
        142. # 获取爬取对象已爬取的idx,key和是否完成爬取的信号single
        143. idx, key, single = spider.get_idx_and_rt()
        144. if not single:
        145. print(f'{year}年的共{idx}行数据已经全部下载,无需爬取')
        146. return
        147. print(f'{year}年的爬虫任务启动, 从{idx+1}行开始爬取')
        148. spider.crawl_pages(idx+1, key) # 特别注意,已经爬取了idx行,重启时,下一行需要+1,否则重启后,会覆盖一行数据
        149. if __name__ == '__main__':
        150. pools = []
        151. for y in range(2009, 2021):
        152. pool = threading.Thread(
        153. target=mul_crawl, args=(y, 2), name=f'{y}_thread' # 按行爬取,nrows参数需要为2
        154. )
        155. pool.start()
        156. pools.append(pool)
        157. for pool in pools:
        158. pool.join()
        159. print('任务全部完成')

    代码运行示例:

    以时间换空间,每次仅请求一行,绕过R关系解析

    1. 解析数据

      • 方案一

      解析数据困难的部分就是找出R关系规律,这部分是使用手工查询来解决的,直接上代码吧:

    1. class ParseData:
    2.     """
    3.     解析数据的对象
    4.     """
    5.     def __init__(self, file_path: pl.Path = None):
    6.         """
    7.         初始化对象
    8.         :param file_path: TXT数据存放的路径,默认自身目录下的tmp文件夹
    9.         """
    10.         self.file_path = pl.Path('./tmp'if not file_path else file_path
    11.         self.files = list(self.file_path.glob('2*.txt'))
    12.         self.cols_dict = None
    13.         self.colname_dict = {
    14.             'D0''License Type',
    15.             'D1''License Number',
    16.             'D2''Manufacturer Full Name',
    17.             'D3''Manufacturer ID',
    18.             'D4''City',
    19.             'D5''State',
    20.             'D6''Full Name',
    21.             'D7''Payment Category',
    22.             'D8''Covered Recipient ID'
    23.         }
    24.         self.colname_dict_T = {v: k for k, v in self.colname_dict.items()}
    25.     def make_excels(self):
    26.         """
    27.         将每个数据文件单独转换为excel数据表用于分析每份数据
    28.         :return:
    29.         """
    30.         for file in self.files:
    31.             with open(file, 'r'as fin:
    32.                 res = json.loads(fin.read())
    33.             dfx = pd.DataFrame(res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'])
    34.             dfx['filename'] = file.stem
    35.             dfx[['year''part']] = dfx['filename'].str.split('_', expand=True)
    36.             dfx['C_count'] = dfx['C'].map(len)
    37.             writer = pd.ExcelWriter(self.file_path / f'{file.stem}.xlsx')
    38.             dfx.to_excel(writer, sheet_name='data')
    39.             for k, v in res['results'][0]['result']['data']['dsr']['DS'][0]['ValueDicts'].items():
    40.                 dfx = pd.Series(v).to_frame()
    41.                 dfx.to_excel(writer, sheet_name=k)
    42.             writer.save()
    43.         print('所有数据均已转为Excel')
    44.     def make_single_excel(self):
    45.         """
    46.         将所有数据生成一份excel文件,不包含字典
    47.         :return:
    48.         """
    49.         # 合并成整个文件
    50.         df = pd.DataFrame()
    51.         for file in self.files:
    52.             with open(file, 'r'as fin:
    53.                 res = json.loads(fin.read())
    54.             dfx = pd.DataFrame(res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'])
    55.             dfx['filename'] = file.stem
    56.             dfx[['year''part']] = dfx['filename'].str.split('_', expand=True)
    57.             dfx['C_count'] = dfx['C'].map(len)
    58.             df = pd.concat([df, dfx])
    59.         return df
    60.     def get_cols_dict(self):
    61.         """
    62.         读取列关系的字典
    63.         :return:
    64.         """
    65.         # 读取列字典表
    66.         self.cols_dict = pd.read_excel(self.file_path.parent / 'cols_dict.xlsx')
    67.         self.cols_dict.set_index('R', inplace=True)
    68.         self.cols_dict = self.cols_dict.dropna()
    69.         self.cols_dict.drop(columns=['C_count', ], inplace=True)
    70.         self.cols_dict.columns = [col.split(':')[-1for col in self.cols_dict.columns]
    71.         self.cols_dict = self.cols_dict.astype('int')
    72.     def make_dataframe(self, filename):
    73.         """
    74.         读取TXT文件,转换成dataframe
    75.         :param filename: 需要转换的文件
    76.         :return: 
    77.         """
    78.         with open(filename, 'r'as fin:
    79.             res = json.loads(fin.read())
    80.         df0 = pd.DataFrame(res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'])
    81.         df0['R'] = df0['R'].fillna(0)
    82.         df0['R'] = df0['R'].map(int)
    83.         values_dict = res['results'][0]['result']['data']['dsr']['DS'][0]['ValueDicts']
    84.         dfx = []
    85.         for idx in df0.index:
    86.             row_value = df0.loc[idx, 'C'].copy()
    87.             cols = self.cols_dict.loc[int(df0.loc[idx, 'R'])].to_dict()
    88.             row = {}
    89.             for col in ['License Type''License Number''Manufacturer Full Name''Manufacturer ID''City''State',
    90.                         'Full Name''Payment Category''Disclosure Year''Covered Recipient ID''Amount of Payment',
    91.                         'Number of Events Reflected']:
    92.                 v = cols.get(col)
    93.                 if v:
    94.                     value = row_value.pop(0)
    95.                     if col in self.colname_dict.values():
    96.                         if not isinstance(value, str):
    97.                             value_list = values_dict.get(self.colname_dict_T.get(col), [])
    98.                             value = value_list[value]
    99.                     row[col] = value
    100.                 else:
    101.                     row[col] = None
    102.             row['R'] = int(df0.loc[idx, 'R'])
    103.             dfx.append(row)
    104.         dfx = pd.DataFrame(dfx)
    105.         dfx = dfx.fillna(method='ffill')
    106.         dfx[['Disclosure Year''Number of Events Reflected']] = dfx[
    107.             ['Disclosure Year''Number of Events Reflected']].astype('int')
    108.         dfx = dfx[['Covered Recipient ID''Full Name''License Type''License Number''Manufacturer ID',
    109.                    'Manufacturer Full Name''City',
    110.                    'State''Payment Category''Amount of Payment''Number of Events Reflected''Disclosure Year',
    111.                    'R']]
    112.         return dfx
    113.     def parse_data(self, out_name: str = None):
    114.         """
    115.         解析合并数据
    116.         :param out_name: 输出的文件名
    117.         :return: 
    118.         """
    119.         df = pd.DataFrame()
    120.         for n, f in enumerate(self.files):
    121.             dfx = self.make_dataframe(f)
    122.             df = pd.concat([df, dfx])
    123.             print(f'完成第{n + 1}个文件,剩余{len(self.files) - n - 1}个,共{len(self.files)}个')
    124.         df.drop(columns='R').to_csv(self.file_path / f'{out_name}.csv', index=False)
    125.         return df
    • 方案二

      使用方案二处理数据时,在进行数据后验后发现,还有两个细节问题需要解决:
      一是返回值中出现了新的关键字“Ø”,经手工验证才知道代表输出的行中,存在本身就是空值的情况,遍历数据后,发现只有出现3个不同值(60, 128, 2048),因此,手工制作了col_dict(详见代码)。\

    1. class ParseDatav2:
    2.     """
    3.     解析数据的对象第二版,将按行爬取的的json文件,转换成dataframe,增量写入csv文件,
    4.     因每次请求一行,首行数据不存在与上一行相同情形,因此,除个别本身无数据情况,绝大多数均为完整的12列数据,
    5.     """
    6.     def __init__(self):
    7.         """
    8.         初始化
    9.         """
    10.         # 初始化一行的dataframe,
    11.         self.row = pd.DataFrame([
    12.             'Covered Recipient ID''Full Name''License Type''License Number''Manufacturer ID',
    13.             'Manufacturer Full Name''City''State''Payment Category''Amount of Payment',
    14.             'Number of Events Reflected''Disclosure Year'
    15.         ]).set_index(0)
    16.         self.row[0] = None
    17.         self.row = self.row.T
    18.         self.row['idx'] = None
    19.         # 根据 Ø 值的不同选择不同的列,目前仅三种不同的Ø值,注0为默认值,指包含所有列
    20.         self.col_dict = {
    21.             # 完整的12列
    22.             0: ['License Type''License Number''Manufacturer Full Name''Manufacturer ID''City''State',
    23.                 'Full Name''Payment Category''Disclosure Year''Covered Recipient ID''Amount of Payment',
    24.                 'Number of Events Reflected'],
    25.             # 有4列是空值,分别是 'Manufacturer Full Name', 'Manufacturer ID', 'City', 'State'
    26.             60: ['License Type''License Number',
    27.                  'Full Name''Payment Category''Disclosure Year''Covered Recipient ID''Amount of Payment',
    28.                  'Number of Events Reflected'],
    29.             # 有1列是空值,是 'Payment Category'
    30.             128: ['License Type''License Number''Manufacturer Full Name''Manufacturer ID''City''State',
    31.                   'Full Name''Disclosure Year''Covered Recipient ID''Amount of Payment',
    32.                   'Number of Events Reflected'],
    33.             # 有1列是空值,是 'Number of Events Reflected'
    34.             2048: ['License Type''License Number''Manufacturer Full Name''Manufacturer ID''City''State',
    35.                    'Full Name''Payment Category''Disclosure Year''Covered Recipient ID''Amount of Payment'],
    36.         }
    37.         # 列名转换字典
    38.         self.colname_dict = {
    39.             'License Type''D0',
    40.             'License Number''D1',
    41.             'Manufacturer Full Name''D2',
    42.             'Manufacturer ID''D3',
    43.             'City''D4',
    44.             'State''D5',
    45.             'Full Name''D6',
    46.             'Payment Category''D7',
    47.             'Covered Recipient ID''D8'
    48.         }
    49.         # 储存爬取的json文件的路径
    50.         self.data_path = pl.Path('./tmp')
    51.         # 获取json文件的迭代器
    52.         self.files = self.data_path.glob('*.txt')
    53.         # 初始化输出文件的名称及路径
    54.         self.file_name = self.data_path.parent / 'data.csv'
    55.     def create_csv(self):
    56.         """
    57.         先输出一个CSV文件头用于增量写入数据
    58.         :return:
    59.         """
    60.         self.row.drop(0, axis=0).to_csv(self.file_name, index=False)
    61.     def parse_data(self, filename: pl.Path):
    62.         """
    63.         读取按1行数据请求获取的json文件,一行数据
    64.         :param filename: json文件的路径
    65.         :return: None
    66.         """
    67.         row = self.row.copy()  # 复制一行dataframe用于后续修改
    68.         res = PageSpider.read_json(filename)
    69.         # 获取数据中的valuedicts
    70.         valuedicts = res['results'][0]['result']['data']['dsr']['DS'][0]['ValueDicts']
    71.         # 获取数据中每行的数据
    72.         row_values = res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'][0]['C']
    73.         # 获取数据中的'Ø'值(若有),该值代表输出的行中,存在空白部分,用于确定数据列
    74.         cols = ic(self.col_dict.get(
    75.             res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'][0].get('Ø'0)
    76.         ))
    77.         # 遍历每行数据,修改row这个dataframe的值
    78.         for col, value in zip(cols, row_values):
    79.             ic(col, value)
    80.             colname = self.colname_dict.get(col)  # colname转换,D0~D8
    81.             if colname:  # 如果非空,则需要转换值
    82.                 value = valuedicts.get(self.colname_dict.get(col))[0]
    83.             # 修改dataframe数据
    84.             row.loc[0, col] = value
    85.         # 写入索引值
    86.         row['idx'] = int(filename.stem.split('_')[-1].replace('part'''))
    87.         return row
    88.     def run(self):
    89.         """
    90.         运行写入程序
    91.         """
    92.         self.create_csv()
    93.         for idx, filename in enumerate(self.files):
    94.             row = self.parse_data(filename)
    95.             row.to_csv(self.file_name, mode='a', header=None, index=False)
    96.             print(f'第{idx + 1}个文件{filename.stem}写入表格成功')
    97.         print('全部文件写入完成')

      二是每行数据请求,nrows需要设置为2,而最后一行数据无法通过该方式获取,因此,需要从最后一个返回的json数据中解析出最后一行数据(详见LastRow类)

    1. class LastRow:
    2.     """
    3.     获取并写入最后一行数据的类
    4.     由于每次请求一行数据的方式,存在缺陷,无法获取到最后一行数据,
    5.     本方法是对最后一个能够获取的json(倒数第二行)进行解析,取得最后一行数据,
    6.     本方法存在缺陷,即默认最后一行“Amount of Payment”列值一定与倒数第二行不同,
    7.     目前2009年至2020年共12年的数据中,均满足上述条件,没有出错。
    8.     除本方法外,还可以通过逆转排序请求的方式,获取最后一行数据
    9.     """
    10.     def __init__(self):
    11.         """
    12.         初始化
    13.         """
    14.         self.file_path = pl.Path('./tmp')  # 存储爬取json数据的路径
    15.         self.files_df = pd.DataFrame()  # 初始化最后一份请求的dataframe
    16.         # 列名对应的字典
    17.         self.colname_dict = {
    18.             'D0''License Type',
    19.             'D1''License Number',
    20.             'D2''Manufacturer Full Name',
    21.             'D3''Manufacturer ID',
    22.             'D4''City',
    23.             'D5''State',
    24.             'D6''Full Name',
    25.             'D7''Payment Category',
    26.             'year''Disclosure Year',
    27.             'D8''Covered Recipient ID',
    28.             'M0''Amount of Payment',
    29.             'M1''Number of Events Reflected'
    30.         }  
    31.         self.data = pd.DataFrame()  # 初始化最后一行数据data
    32.     def get_last_file(self):
    33.         """
    34.         遍历文件夹,取得最后一份请求的dataframe
    35.         """
    36.         self.files_df = pd.DataFrame(list(self.file_path.glob('*.txt')), columns=['filename'])
    37.         self.files_df[['year''idx']] = self.files_df['filename'].map(lambda x: x.stem).str.split('_', expand=True)
    38.         self.files_df['idx'] = self.files_df['idx'].str.replace('part''')
    39.         self.files_df['idx'] = self.files_df['idx'].astype(int)
    40.         self.files_df.sort_values(by=['year''idx'], inplace=True)
    41.         self.files_df = self.files_df.drop_duplicates('year', keep='last')
    42.     def get_last_row(selfser: pd.Series) -> pd.DataFrame:
    43.         """
    44.         解析文件,获取最后一行的数据
    45.         :param ser: 一行文件信息的series
    46.         """
    47.         # 读取json数据
    48.         res = PageSpider.read_json(ser['filename'])
    49.         # 获取values_dict
    50.         values_dict = res['results'][0]['result']['data']['dsr']['DS'][0]['ValueDicts']
    51.         # 获取文件中的第一行数据
    52.         row_values = res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'][0]['C']
    53.         # 获取文件中的下一行数据,因文件是倒数第二行的数据,因此下一行即为最后一行
    54.         next_row_values = res['results'][0]['result']['data']['dsr']['DS'][0]['PH'][0]['DM0'][1]['C']
    55.         # 初始化Series
    56.         row = pd.Series()
    57.         # 解析数据填充series
    58.         for k, col in self.colname_dict.items():
    59.             value = row_values.pop(0)
    60.             if k.startswith('D'):  # 如果K值是D开头
    61.                 values = values_dict[k]
    62.                 if len(values) == 2:
    63.                     value = next_row_values.pop(0)
    64.                 value = values[-1]
    65.             elif k == 'year':
    66.                 pass
    67.             else:
    68.                 if next_row_values:
    69.                    value = next_row_values.pop(0)
    70.             row[col] = value
    71.         row['idx'] = ser['idx'] + 1
    72.         row = row.to_frame().T
    73.         return row
    74.     def run(self):
    75.         """
    76.         运行获取最后一行数据的方法
    77.         """
    78.         self.get_last_file()
    79.         for i in self.files_df.index:
    80.             self.data = pd.concat([self.data, self.get_last_row(self.files_df.loc[i])])
    81.         self.data = self.data[[
    82.             'Covered Recipient ID''Full Name''License Type''License Number''Manufacturer ID',
    83.             'Manufacturer Full Name''City''State''Payment Category''Amount of Payment',
    84.             'Number of Events Reflected''Disclosure Year''idx'
    85.         ]]
    86.         filename = self.file_path.parent / 'data.csv'
    87.         self.data.to_csv(filename, mode='a', index=False, header=None)
    88.         return self.data
    1. 结果展示

    2010年

    2015年

    2020年尾部

    1. 延伸思考

      如果将上述方案一与方案二结合,整理出所有不同R关系的行样例,使用方案二爬取少量的部分示例,然后推导出完整的R关系字典,再使用方案一的方法进行爬取解析,将大大节约时间。该方式在数据量远远超过当前数量时,可以考虑使用。


    总结

      完成整个项目过程中历经了:暗爽(不到1小时就完成了爬虫部分功能)->迷茫(JS逆向失败,无法总结R关系规律)->焦虑与烦躁(担心无法完成任务,手工查询规则5个多小时)->开窍(复盘过程中突然发现新思路)一系列过程。最终结果还是较为顺利的完成了整个任务,而最大的感触还是思路的开拓:一条路走不通时,也许换个方向就能解决问题(注:count参数500一开始就使用了,只是一直在增加请求的行数,而一直没有想到减少请求的行数这么一个小小的改变,就能带来巨大的突破)。

                            我是政胤 期待你的关注

         

  • 相关阅读:
    CSS----字体属性
    SpringBoot3 配置Logback日志滚动文件
    表内容的操作(增删查改)【MySQL】
    Ubuntu20 QT6.0 编译 ODBC 驱动
    使用SSM搭建图书商城管理系统(完整过程介绍、售后服务哈哈哈)
    电流继电器JL-8GB/11/AC220V
    目标检测论文解读复现之十六:基于改进YOLOv5的小目标检测算法
    无胁科技-TVD每日漏洞情报-2022-11-28
    华纳云:centos系统中怎么查看cpu信息?
    java178-终篇?静态代理?动态代理?
  • 原文地址:https://blog.csdn.net/m0_69043821/article/details/125827925