- import logging
- from bs4 import BeautifulSoup
- from datetime import datetime, timedelta
- import time
- import requests
- from concurrent.futures import ThreadPoolExecutor
-
- # 配置日志记录器
- logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s',
- filename='get_ip_error.log')
-
- time_threshold = 15 # 录入IP的时间和当前时间差阈值小于等于15分钟时进行检查
- page_valid = 3 # 当每次超过X个有效IP时返回
-
-
- url_kuai= 'http://www.kuaidaili.com/free/inha/'
- #请求头
- headers = {
-
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- }
-
-
- def check_proxy(p):
- '''
- 多线程检查代理IP的有效性
- :param p: 代理IP列表[]
- :return: 有效的代理IP列表
- '''
- url = "http://httpbin.org/ip" # 用于测试代理IP有效性的网站
- valid_proxies = []
-
- def check_single_proxy(proxy):
- proxies = {
- "http": f"http://{proxy['ip']}:{proxy['port']}",
- # "https": f"https://{proxy['ip']}:{proxy['port']}"
- }
- try:
- response = requests.get(url, proxies=proxies, timeout=1.5)
- if response.ok:
- valid_proxies.append(proxy)
- except requests.exceptions.RequestException as e:
- pass
-
- # 开启多线程检查
- with ThreadPoolExecutor() as executor:
- executor.map(check_single_proxy, p)
-
- return valid_proxies
-
-
-
-
- def get_kuaidaili(page):
- '''
- 获取89ip的代理IP列表
- :param page: 页码
- :return: 有效的代理IP列表和页码
- '''
-
- try:
- valid_ip_list = []
-
- while page <= 25:#在这里,只有当有效ip数量>=3的时候,他才会停止;否则,就会不停增加page去爬取;
- full_url = url_kuai+str(page) # 根据页码构建URL
- print('当前正在爬取网页--->:', full_url)
- response = requests.get(full_url,headers=headers)
- if response.ok:
- html = response.text
- soup = BeautifulSoup(html, 'html.parser')
- table = soup.find('table', class_='table table-b table-bordered table-striped')
- tbody = table.find('tbody')
-
- # if time_diff(tbody,6): #如果时间差在XX分钟以内,就开始检验ip(第6个td是时间) 他更新不快,所以不能用时间去筛选
- valid_proxies = tbody_add_proxy(tbody,6) # 获取tbody的数据(time的位置=6)
-
- if valid_proxies is not None and len(valid_proxies) > 0:
- valid_ip_list.extend(valid_proxies)
-
-
- if len(valid_ip_list) >= page_valid: # 有效 IP 数量大于等于 page_valid 就停止爬取
- break
-
- page += 1
- else:
- page=0
-
- #当page>25的时候,page=1重新开始
- return valid_ip_list, page
-
-
-
- except requests.exceptions.RequestException as e:
- print(f"爬取异常: {e}")
- return valid_ip_list, page
-
-
-
-
-
-
-
- def time_diff(table,much):
- '''
- 查询当前页的IP更新时间,判断是否小于等于阈值 time_threshold,若是返回True
- :param table: IP表格
- :return: 时间是否小于等于阈值
- '''
- rows = table.find_all('tr')
-
- given_time = datetime.strptime(rows[0].find_all('td')[much].text.strip(), "%Y/%m/%d %H:%M:%S")
- current_time = datetime.now()
- time_difference = current_time - given_time
-
- return time_difference <= timedelta(minutes=time_threshold)
-
-
- def tbody_add_proxy(tbody,much):
- '''
- 提取代理IP和端口信息,并将其构建为列表形式
- :param tbody: 表格内容
- :return: 代理IP和端口的列表
- '''
- proxy_list = []
-
- rows = tbody.find_all('tr')
- for row in rows:
- proxy = {}
- cells = row.find_all('td')
- proxy["ip"] = cells[0].text.strip()
- proxy["port"] = cells[1].text.strip()
- proxy["time"] = cells[much].text.strip()
- proxy_list.append(proxy)
- return check_proxy(proxy_list) #返回有效的ip[(list)]
-
-
-
-
-
-
-
- import redis
- import json
-
- # 创建连接池
- pool_config = {
- 'host': 'localhost',
- 'port': 6379,
- 'db': 0,
- 'max_connections': 10,
- 'decode_responses': True,
- 'encoding': 'utf-8'
- }
-
- # 创建redis连接池
- pool = redis.ConnectionPool(**pool_config)
-
-
- def add_to_sorted_set(ip):
- '''
- 将IP添加到有序集合中,确保唯一性
- :param ip: IP信息的字典
- '''
- r = redis.Redis(connection_pool=pool)
- ip_de = json.dumps(ip)
-
- # 判断IP在有序集合中是否已存在
- if not r.zscore('valid', ip_de):
- r.zadd('valid', {ip_de: 0})
-
-
-
- def find_valid_ip():
- '''
- 获取当前Redis中有效IP的数量
- :return: 有效IP的数量
- '''
- r = redis.Redis(connection_pool=pool)
- count = r.zcard('valid') #因为是有序集合,所以需要用zcard
- if count is None:
- return 0
- return count
-
- def pop_from_sorted_set():
- '''
- 从有序集合中弹出一个元素(按添加顺序)
- :return: 弹出的IP信息字典
- '''
- r = redis.Redis(connection_pool=pool)
- ip_de = r.zrange('valid', 0, 0)[0]
-
- # 从有序集合中移除已弹出的元素
- r.zrem('valid', ip_de)
-
- return json.loads(ip_de)
-
-
-
-
-
-
-
- from redis_task import redis_task as redis,get_ip
-
- import time
- import requests
-
-
-
-
-
- # 配置日志记录器
-
-
-
-
-
- import time
-
- def ip_control():
- '''
- 1.检查redis里面是否有足够的有效ip(>10)
- - 足够
- - 达到目标数量(例如 20)后停止更新
- - 休眠一段时间后再继续更新
- - 不足够
- - 开始从 check_url(url_parse_dict) 获取新的有效ip
- - 新的ip如果与现有的 redis ip 重复,则不放入
- - 不重复则放入,直到 redis 有效 ip 数量达到目标数量
- '''
- target_count = 20 # 目标有效 IP 数量
- current_page = 1 # 保存当前页码
- while True:
- count = redis.find_valid_ip()
- print('*******************************************************************')
- print(f"目前redis里面有{count}个有效ip")
-
- if count < target_count:
- valid_ips, page = get_ip.get_kuaidaili(current_page) # 使用 current_page
- print(f"当前返回的页码:{page}")
-
-
- if valid_ips:
- print(f"有效代理IP有:{len(valid_ips)}")
-
- redis.add_to_sorted_set(valid_ips) #必须添加有序集合,确保唯一性 以及后期提取时可以自动移除
- current_page =page+1 # 更新 current_page,使其递增
- else:
- #此时是redis内的有效ip没达到20个
- print('此时没有达到20个,怎么办?')
-
- else:
- print(f"已经达到目标数量:{target_count},30秒后再更新")
- time.sleep(10)
-
-
-
-
- ip_control()
最后:
当然,此时的`快代理`已经有点不行了.经过我的测试,1~25page 平局只有 8个左右ip有效! 于是需要添加其他的网站------>那么什么是好的免费ip网站呢? 就是一小时内,他的免费ip会更新的,一般都是比较不错的网站!!!
把这代码搞懂了,需要"素材"的可以私