Python的多线程爬虫
在本文中,我们将描述如何使用Python构建一个简单的基于多线程的爬虫。
需要的模块
bs4 : Beautiful Soup (bs4) 是一个Python库,用于从 HTML 和 XML 文件中提取数据。要安装此库,请在 IDE/终端中键入以下命令。
pip install bs4
requests:这个库允许你很容易地发送 HTTP/1.1 请求。要安装此库,请在 IDE/终端中键入以下命令。
pip install requests
分步实施
第一步:我们首先导入所有需要爬取的库。如果您使用的是 Python3,您应该已经拥有除了 BeautifulSoup、requests 之外的所有库。因此,如果您尚未安装这两个库,则需要使用上面指定的命令安装它们。
Python3
import multiprocessing
from bs4 import BeautifulSoup
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
import requests
Python3
if __name__ == '__main__':
cc = MultiThreadedCrawler("https://www.geeksforgeeks.org/")
cc.run_web_crawler()
cc.info()
Python3
class MultiThreadedCrawler:
def __init__(self, seed_url):
self.seed_url = seed_url
self.root_url = '{}://{}'.format(urlparse(self.seed_url).scheme,
urlparse(self.seed_url).netloc)
self.pool = ThreadPoolExecutor(max_workers=5)
self.scraped_pages = set([])
self.crawl_queue = Queue()
self.crawl_queue.put(self.seed_url)
Python3
def run_web_crawler(self):
while True:
try:
print("\n Name of the current executing process: ",
multiprocessing.current_process().name, '\n')
target_url = self.crawl_queue.get(timeout=60)
if target_url not in self.scraped_pages:
print("Scraping URL: {}".format(target_url))
self.scraped_pages.add(target_url)
job = self.pool.submit(self.scrape_page, target_url)
job.add_done_callback(self.post_scrape_callback)
except Empty:
return
except Exception as e:
print(e)
continue
Python3
def scrape_page(self, url):
try:
res = requests.get(url, timeout=(3, 30))
return res
except requests.RequestException:
return
Python3
def scrape_info(self, html):
soup = BeautifulSoup(html, "html5lib")
web_page_paragraph_contents = soup('p')
text = ''
for para in web_page_paragraph_contents:
if not ('https:' in str(para.text)):
text = text + str(para.text).strip()
print('\n <-----Text Present in The WebPage is--->\n', text, '\n')
return
Python3
def parse_links(self, html):
soup = BeautifulSoup(html, 'html.parser')
Anchor_Tags = soup.find_all('a', href=True)
for link in Anchor_Tags:
url = link['href']
if url.startswith('/') or url.startswith(self.root_url):
url = urljoin(self.root_url, url)
if url not in self.scraped_pages:
self.crawl_queue.put(url)
Python3
def post_scrape_callback(self, res):
result = res.result()
if result and result.status_code == 200:
self.parse_links(result.text)
self.scrape_info(result.text)
Python3
import multiprocessing
from bs4 import BeautifulSoup
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
import requests
class MultiThreadedCrawler:
def __init__(self, seed_url):
self.seed_url = seed_url
self.root_url = '{}://{}'.format(urlparse(self.seed_url).scheme,
urlparse(self.seed_url).netloc)
self.pool = ThreadPoolExecutor(max_workers=5)
self.scraped_pages = set([])
self.crawl_queue = Queue()
self.crawl_queue.put(self.seed_url)
def parse_links(self, html):
soup = BeautifulSoup(html, 'html.parser')
Anchor_Tags = soup.find_all('a', href=True)
for link in Anchor_Tags:
url = link['href']
if url.startswith('/') or url.startswith(self.root_url):
url = urljoin(self.root_url, url)
if url not in self.scraped_pages:
self.crawl_queue.put(url)
def scrape_info(self, html):
soup = BeautifulSoup(html, "html5lib")
web_page_paragraph_contents = soup('p')
text = ''
for para in web_page_paragraph_contents:
if not ('https:' in str(para.text)):
text = text + str(para.text).strip()
print(f'\n <---Text Present in The WebPage is --->\n', text, '\n')
return
def post_scrape_callback(self, res):
result = res.result()
if result and result.status_code == 200:
self.parse_links(result.text)
self.scrape_info(result.text)
def scrape_page(self, url):
try:
res = requests.get(url, timeout=(3, 30))
return res
except requests.RequestException:
return
def run_web_crawler(self):
while True:
try:
print("\n Name of the current executing process: ",
multiprocessing.current_process().name, '\n')
target_url = self.crawl_queue.get(timeout=60)
if target_url not in self.scraped_pages:
print("Scraping URL: {}".format(target_url))
self.current_scraping_url = "{}".format(target_url)
self.scraped_pages.add(target_url)
job = self.pool.submit(self.scrape_page, target_url)
job.add_done_callback(self.post_scrape_callback)
except Empty:
return
except Exception as e:
print(e)
continue
def info(self):
print('\n Seed URL is: ', self.seed_url, '\n')
print('Scraped pages are: ', self.scraped_pages, '\n')
if __name__ == '__main__':
cc = MultiThreadedCrawler("https://www.geeksforgeeks.org/")
cc.run_web_crawler()
cc.info()
第二步:创建一个主程序,然后创建一个类 MultiThreadedCrawler 的对象并将种子 URL 传递给它的参数化构造函数,并调用 run_web_scrawler() 方法。
蟒蛇3
if __name__ == '__main__':
cc = MultiThreadedCrawler("https://www.geeksforgeeks.org/")
cc.run_web_crawler()
cc.info()
第 3 步:创建一个名为 MultiThreadedCrawler 的类。并在构造函数中初始化所有变量,将基 URL 分配给名为 seed_url 的实例变量。然后将基本 URL 格式化为绝对 URL,使用方案作为 HTTPS 和网址。
要同时执行爬网前沿任务,请在Python使用多线程。创建一个ThreadPoolExecutor 类的对象并将max workers 设置为5,即一次执行5 个线程。并避免重复访问网页,为了维护历史创建一套数据结构。
创建一个队列来存储爬行边界的所有 URL,并将第一项作为种子 URL。
蟒蛇3
class MultiThreadedCrawler:
def __init__(self, seed_url):
self.seed_url = seed_url
self.root_url = '{}://{}'.format(urlparse(self.seed_url).scheme,
urlparse(self.seed_url).netloc)
self.pool = ThreadPoolExecutor(max_workers=5)
self.scraped_pages = set([])
self.crawl_queue = Queue()
self.crawl_queue.put(self.seed_url)
步骤 4:创建一个名为 run_web_crawler() 的方法,以继续添加到 Frontier 的链接并提取信息,使用无限 while 循环并显示当前正在执行的进程的名称。
从爬网边界获取 URL,查找指定超时为 60 秒,并检查当前 URL 是否已被访问。如果尚未访问过,请格式化当前 URL 并将其添加到 scraped_pages 设置中以存储在访问页面的历史记录中,并从线程池中选择并传递抓取页面和目标 URL。
蟒蛇3
def run_web_crawler(self):
while True:
try:
print("\n Name of the current executing process: ",
multiprocessing.current_process().name, '\n')
target_url = self.crawl_queue.get(timeout=60)
if target_url not in self.scraped_pages:
print("Scraping URL: {}".format(target_url))
self.scraped_pages.add(target_url)
job = self.pool.submit(self.scrape_page, target_url)
job.add_done_callback(self.post_scrape_callback)
except Empty:
return
except Exception as e:
print(e)
continue
第五步:使用握手方式发起请求,设置默认时间为3,最大时间为30,请求成功返回结果集。
蟒蛇3
def scrape_page(self, url):
try:
res = requests.get(url, timeout=(3, 30))
return res
except requests.RequestException:
return
第 6 步:创建一个名为 scrape_info() 的方法。并将网页数据传递到 BeautifulSoap 中,它通过修复错误的 HTML 来帮助我们组织和格式化凌乱的网页数据,并以易于遍历的结构呈现给我们。
使用 BeautifulSoup运算符提取 HTML 文档中存在的所有文本。
蟒蛇3
def scrape_info(self, html):
soup = BeautifulSoup(html, "html5lib")
web_page_paragraph_contents = soup('p')
text = ''
for para in web_page_paragraph_contents:
if not ('https:' in str(para.text)):
text = text + str(para.text).strip()
print('\n <-----Text Present in The WebPage is--->\n', text, '\n')
return
第 7 步:创建一个名为 parse links 的方法,使用 BeautifulSoup运算符提取 HTML 文档中存在的所有锚标记。 Soup.find_all('a',href=True)返回包含网页中存在的所有锚标记的项目列表。将所有标签存储在名为 anchor_Tags 的列表中。对于 Aachor_Tags 列表中存在的每个锚标记,使用 Link['href'] 检索与标记中的 href 关联的值。对于每个检索到的 URL,检查它是绝对 URL 还是相对 URL。
- 相对 URL:没有根 URL 和协议名称的 URL。
- 绝对 URL:带有协议名称的 URL、根 URL、文档名称。
如果是使用 urljoin 方法的相对 URL,则使用基本 URL 和相对 URL 将其更改为绝对 URL。检查当前 URL 是否已经被访问过。如果该 URL 尚未被访问,则将其放入抓取队列中。
蟒蛇3
def parse_links(self, html):
soup = BeautifulSoup(html, 'html.parser')
Anchor_Tags = soup.find_all('a', href=True)
for link in Anchor_Tags:
url = link['href']
if url.startswith('/') or url.startswith(self.root_url):
url = urljoin(self.root_url, url)
if url not in self.scraped_pages:
self.crawl_queue.put(url)
第 8 步:为了提取链接,调用名为 parse_links() 的方法并传递结果。为了提取内容,调用名为 scrape_info() 的方法并传递结果。
蟒蛇3
def post_scrape_callback(self, res):
result = res.result()
if result and result.status_code == 200:
self.parse_links(result.text)
self.scrape_info(result.text)
下面是完整的实现:
蟒蛇3
import multiprocessing
from bs4 import BeautifulSoup
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
import requests
class MultiThreadedCrawler:
def __init__(self, seed_url):
self.seed_url = seed_url
self.root_url = '{}://{}'.format(urlparse(self.seed_url).scheme,
urlparse(self.seed_url).netloc)
self.pool = ThreadPoolExecutor(max_workers=5)
self.scraped_pages = set([])
self.crawl_queue = Queue()
self.crawl_queue.put(self.seed_url)
def parse_links(self, html):
soup = BeautifulSoup(html, 'html.parser')
Anchor_Tags = soup.find_all('a', href=True)
for link in Anchor_Tags:
url = link['href']
if url.startswith('/') or url.startswith(self.root_url):
url = urljoin(self.root_url, url)
if url not in self.scraped_pages:
self.crawl_queue.put(url)
def scrape_info(self, html):
soup = BeautifulSoup(html, "html5lib")
web_page_paragraph_contents = soup('p')
text = ''
for para in web_page_paragraph_contents:
if not ('https:' in str(para.text)):
text = text + str(para.text).strip()
print(f'\n <---Text Present in The WebPage is --->\n', text, '\n')
return
def post_scrape_callback(self, res):
result = res.result()
if result and result.status_code == 200:
self.parse_links(result.text)
self.scrape_info(result.text)
def scrape_page(self, url):
try:
res = requests.get(url, timeout=(3, 30))
return res
except requests.RequestException:
return
def run_web_crawler(self):
while True:
try:
print("\n Name of the current executing process: ",
multiprocessing.current_process().name, '\n')
target_url = self.crawl_queue.get(timeout=60)
if target_url not in self.scraped_pages:
print("Scraping URL: {}".format(target_url))
self.current_scraping_url = "{}".format(target_url)
self.scraped_pages.add(target_url)
job = self.pool.submit(self.scrape_page, target_url)
job.add_done_callback(self.post_scrape_callback)
except Empty:
return
except Exception as e:
print(e)
continue
def info(self):
print('\n Seed URL is: ', self.seed_url, '\n')
print('Scraped pages are: ', self.scraped_pages, '\n')
if __name__ == '__main__':
cc = MultiThreadedCrawler("https://www.geeksforgeeks.org/")
cc.run_web_crawler()
cc.info()
输出: