利用Python封装爬虫完美采集拼多多商品详情数据 + 商品列表数据 + 商品优惠券数据
下面是一个完整的拼多多数据采集系统,支持采集商品列表、商品详情和商品优惠券数据。系统使用Python实现,采用模块化设计,包含完整的图形用户界面。q=0.2',"""加载代理列表(实际使用时需要配置自己的代理)"""# 这里只是一个示例,实际使用时请替换为有效的代理return [# 更多代理..."""随机获取一个代理""""""生成随机请求头"""
拼多多数据采集系统 (Python)
下面是一个完整的拼多多数据采集系统,支持采集商品列表、商品详情和商品优惠券数据。系统使用Python实现,采用模块化设计,包含完整的图形用户界面。
python
import requests
import json
import re
import time
import random
import threading
from urllib.parse import quote
from bs4 import BeautifulSoup
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
from datetime import datetime
import csv
import os
import pandas as pd
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor, as_completed
class PinduoduoSpider:
def __init__(self):
self.session = requests.Session()
self.ua = UserAgent()
self.base_headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache'
}
self.proxies = self.load_proxies()
def load_proxies(self):
"""加载代理列表(实际使用时需要配置自己的代理)"""
# 这里只是一个示例,实际使用时请替换为有效的代理
return [
# {'http': 'http://user:pass@ip:port', 'https': 'http://user:pass@ip:port'},
# 更多代理...
]
def get_random_proxy(self):
"""随机获取一个代理"""
if self.proxies:
return random.choice(self.proxies)
return None
def get_random_headers(self):
"""生成随机请求头"""
headers = self.base_headers.copy()
headers['User-Agent'] = self.ua.random
return headers
def fetch_html(self, url, max_retries=3):
"""获取页面HTML"""
for _ in range(max_retries):
try:
proxy = self.get_random_proxy()
headers = self.get_random_headers()
response = self.session.get(url, headers=headers, proxies=proxy, timeout=15)
if response.status_code == 200:
return response.text
elif response.status_code == 403:
print(f"访问被拒绝: {url}")
time.sleep(random.uniform(1, 3))
except Exception as e:
print(f"请求失败: {e}")
time.sleep(random.uniform(2, 5))
return None
def parse_goods_list(self, keyword, page=1):
"""解析商品列表页"""
encoded_keyword = quote(keyword)
url = f"https://search.pinduoduo.com/search?keyword={encoded_keyword}&page={page}"
html = self.fetch_html(url)
if not html:
return []
goods_list = []
soup = BeautifulSoup(html, 'html.parser')
items = soup.select('div[data-index]')
for item in items:
try:
# 商品ID
goods_id = item.get('data-pdd-goods-id') or item.get('id', '').replace('item_', '')
if not goods_id:
continue
# 标题
title_tag = item.select_one('div[class*="title"]') or item.select_one('div[class*="goodsName"]')
title = title_tag.text.strip() if title_tag else ''
# 价格
price_tag = item.select_one('div[class*="price"] span') or item.select_one('span[class*="price"]')
price = price_tag.text.strip() if price_tag else ''
# 销量
sales_tag = item.select_one('div[class*="sales"]') or item.select_one('span[class*="sales"]')
sales = sales_tag.text.strip() if sales_tag else ''
# 店铺
shop_tag = item.select_one('div[class*="shop"]') or item.select_one('div[class*="store"]')
shop = shop_tag.text.strip() if shop_tag else ''
# 商品链接
link_tag = item.select_one('a[href*="goods_id"]') or item.select_one('a[href*="goods_detail"]')
if link_tag:
link = 'https:' + link_tag['href'] if not link_tag['href'].startswith('http') else link_tag['href']
else:
link = f"https://pinduoduo.com/goods.html?goods_id={goods_id}"
goods_list.append({
'goods_id': goods_id,
'title': title,
'price': price,
'sales': sales,
'shop': shop,
'link': link
})
except Exception as e:
print(f"解析商品出错: {e}")
return goods_list
def parse_goods_detail(self, goods_id):
"""解析商品详情页"""
url = f"https://mobile.yangkeduo.com/goods.html?goods_id={goods_id}"
html = self.fetch_html(url)
if not html:
return None
try:
soup = BeautifulSoup(html, 'html.parser')
# 商品标题
title_tag = soup.select_one('div.goods-title')
title = title_tag.text.strip() if title_tag else ''
# 价格
price_tag = soup.select_one('div.goods-price span.amount')
price = price_tag.text.strip() if price_tag else ''
# 原价
origin_price_tag = soup.select_one('div.goods-price span.original')
origin_price = origin_price_tag.text.strip() if origin_price_tag else ''
# 销量
sales_tag = soup.select_one('div.goods-sales span.count')
sales = sales_tag.text.strip() if sales_tag else ''
# 店铺名称
shop_tag = soup.select_one('div.shop-name')
shop = shop_tag.text.strip() if shop_tag else ''
# 店铺评分
shop_score_tag = soup.select_one('div.shop-score')
shop_score = shop_score_tag.text.strip() if shop_score_tag else ''
# 商品描述
desc_tag = soup.select_one('div.goods-desc')
description = desc_tag.text.strip() if desc_tag else ''
# 商品图片
images = []
img_tags = soup.select('div.gallery img')
for img_tag in img_tags:
img_src = img_tag.get('src') or img_tag.get('data-src')
if img_src and not img_src.startswith('http'):
img_src = 'https:' + img_src
if img_src:
images.append(img_src)
# 规格信息
specs = []
spec_tags = soup.select('div.spec-item')
for spec_tag in spec_tags:
spec_name = spec_tag.select_one('span.spec-name')
spec_value = spec_tag.select_one('span.spec-value')
if spec_name and spec_value:
specs.append({
'name': spec_name.text.strip(),
'value': spec_value.text.strip()
})
# 提取优惠券信息
coupons = self.extract_coupons(html)
return {
'goods_id': goods_id,
'title': title,
'price': price,
'origin_price': origin_price,
'sales': sales,
'shop': shop,
'shop_score': shop_score,
'description': description,
'images': images,
'specs': specs,
'coupons': coupons
}
except Exception as e:
print(f"解析商品详情出错: {e}")
return None
def extract_coupons(self, html):
"""从HTML中提取优惠券信息"""
coupons = []
try:
# 使用正则表达式查找优惠券信息
pattern = r'{"coupon":{.*?}'
matches = re.findall(pattern, html)
for match in matches:
try:
# 修复JSON格式
json_str = match.replace('\\"', '"')
data = json.loads(json_str)
coupon_info = data.get('coupon', {})
# 提取优惠券信息
coupon_id = coupon_info.get('coupon_id', '')
discount = coupon_info.get('discount', '')
min_order_amount = coupon_info.get('min_order_amount', '')
quantity = coupon_info.get('quantity', '')
sold_quantity = coupon_info.get('sold_quantity', '')
start_time = coupon_info.get('start_time', '')
end_time = coupon_info.get('end_time', '')
coupons.append({
'coupon_id': coupon_id,
'discount': discount,
'min_order_amount': min_order_amount,
'quantity': quantity,
'sold_quantity': sold_quantity,
'start_time': start_time,
'end_time': end_time
})
except:
continue
except Exception as e:
print(f"提取优惠券信息出错: {e}")
return coupons
def search_goods(self, keyword, max_pages=1):
"""搜索商品"""
all_goods = []
for page in range(1, max_pages + 1):
print(f"正在爬取第 {page} 页...")
goods_list = self.parse_goods_list(keyword, page)
if not goods_list:
break
all_goods.extend(goods_list)
time.sleep(random.uniform(1, 2))
return all_goods
def get_goods_details(self, goods_ids, max_workers=5):
"""获取多个商品的详情"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_goods = {executor.submit(self.parse_goods_detail, goods_id): goods_id for goods_id in goods_ids}
for future in as_completed(future_to_goods):
goods_id = future_to_goods[future]
try:
detail = future.result()
if detail:
results.append(detail)
except Exception as e:
print(f"获取商品 {goods_id} 详情出错: {e}")
return results
class PinduoduoCrawlerApp:
def __init__(self, root):
self.root = root
self.root.title("拼多多数据采集系统 v1.0")
self.root.geometry("1000x700")
self.root.resizable(True, True)
# 初始化爬虫
self.spider = PinduoduoSpider()
# 创建GUI
self.create_widgets()
# 状态变量
self.is_crawling = False
self.crawling_stop = False
self.current_data = []
def create_widgets(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 搜索区域
search_frame = ttk.LabelFrame(main_frame, text="搜索设置", padding="10")
search_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(search_frame, text="搜索关键词:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.keyword_entry = ttk.Entry(search_frame, width=40)
self.keyword_entry.grid(row=0, column=1, padx=5, pady=5, sticky=tk.W)
self.keyword_entry.insert(0, "手机")
ttk.Label(search_frame, text="爬取页数:").grid(row=0, column=2, padx=5, pady=5, sticky=tk.W)
self.page_spinbox = ttk.Spinbox(search_frame, from_=1, to=50, width=5)
self.page_spinbox.grid(row=0, column=3, padx=5, pady=5, sticky=tk.W)
self.page_spinbox.set(2)
ttk.Label(search_frame, text="最大线程数:").grid(row=0, column=4, padx=5, pady=5, sticky=tk.W)
self.thread_spinbox = ttk.Spinbox(search_frame, from_=1, to=20, width=5)
self.thread_spinbox.grid(row=0, column=5, padx=5, pady=5, sticky=tk.W)
self.thread_spinbox.set(5)
search_btn = ttk.Button(search_frame, text="开始搜索", command=self.start_search)
search_btn.grid(row=0, column=6, padx=10, pady=5)
# 数据展示区域
data_frame = ttk.LabelFrame(main_frame, text="数据展示", padding="10")
data_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 创建树状视图
columns = ("goods_id", "title", "price", "sales", "shop")
self.tree = ttk.Treeview(data_frame, columns=columns, show="headings", height=15)
# 设置列标题
self.tree.heading("goods_id", text="商品ID")
self.tree.heading("title", text="商品标题")
self.tree.heading("price", text="价格")
self.tree.heading("sales", text="销量")
self.tree.heading("shop", text="店铺")
# 设置列宽
self.tree.column("goods_id", width=100)
self.tree.column("title", width=300)
self.tree.column("price", width=80)
self.tree.column("sales", width=100)
self.tree.column("shop", width=150)
# 添加滚动条
scrollbar = ttk.Scrollbar(data_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscroll=scrollbar.set)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
# 详情展示区域
detail_frame = ttk.LabelFrame(main_frame, text="商品详情", padding="10")
detail_frame.pack(fill=tk.X, padx=5, pady=5)
self.detail_text = scrolledtext.ScrolledText(detail_frame, height=10)
self.detail_text.pack(fill=tk.BOTH, expand=True)
self.detail_text.config(state=tk.DISABLED)
# 状态栏
status_frame = ttk.Frame(main_frame)
status_frame.pack(fill=tk.X, padx=5, pady=5)
self.status_var = tk.StringVar()
self.status_var.set("就绪")
status_label = ttk.Label(status_frame, textvariable=self.status_var)
status_label.pack(side=tk.LEFT)
# 按钮区域
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill=tk.X, padx=5, pady=5)
fetch_btn = ttk.Button(button_frame, text="获取详情数据", command=self.fetch_details)
fetch_btn.pack(side=tk.LEFT, padx=5)
export_btn = ttk.Button(button_frame, text="导出数据", command=self.export_data)
export_btn.pack(side=tk.LEFT, padx=5)
clear_btn = ttk.Button(button_frame, text="清空数据", command=self.clear_data)
clear_btn.pack(side=tk.LEFT, padx=5)
stop_btn = ttk.Button(button_frame, text="停止采集", command=self.stop_crawling)
stop_btn.pack(side=tk.RIGHT, padx=5)
# 绑定事件
self.tree.bind("<<TreeviewSelect>>", self.show_detail)
def start_search(self):
if self.is_crawling:
messagebox.showwarning("警告", "当前正在采集数据,请稍后再试")
return
keyword = self.keyword_entry.get().strip()
if not keyword:
messagebox.showwarning("警告", "请输入搜索关键词")
return
try:
pages = int(self.page_spinbox.get())
if pages <= 0:
raise ValueError
except:
messagebox.showwarning("警告", "请输入有效的页数")
return
# 清空当前数据
self.clear_data()
# 更新状态
self.is_crawling = True
self.crawling_stop = False
self.status_var.set(f"正在搜索: {keyword}...")
# 启动新线程执行搜索
threading.Thread(target=self.do_search, args=(keyword, pages), daemon=True).start()
def do_search(self, keyword, pages):
try:
goods_list = self.spider.search_goods(keyword, pages)
if self.crawling_stop:
self.status_var.set("搜索已停止")
self.is_crawling = False
return
if not goods_list:
self.status_var.set("未找到相关商品")
self.is_crawling = False
return
# 更新树状视图
for goods in goods_list:
self.tree.insert("", tk.END, values=(
goods['goods_id'],
goods['title'],
goods['price'],
goods['sales'],
goods['shop']
))
# 保存当前数据
self.current_data = goods_list
self.status_var.set(f"搜索完成,共找到 {len(goods_list)} 件商品")
except Exception as e:
self.status_var.set(f"搜索出错: {str(e)}")
finally:
self.is_crawling = False
def fetch_details(self):
if not self.current_data:
messagebox.showwarning("警告", "没有可获取详情的商品数据")
return
if self.is_crawling:
messagebox.showwarning("警告", "当前正在采集数据,请稍后再试")
return
# 获取选中的商品
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "请选择要获取详情的商品")
return
# 获取商品ID
goods_ids = []
for item in selected_items:
values = self.tree.item(item, "values")
if values:
goods_ids.append(values[0])
if not goods_ids:
messagebox.showwarning("警告", "未找到有效的商品ID")
return
# 更新状态
self.is_crawling = True
self.crawling_stop = False
self.status_var.set(f"正在获取 {len(goods_ids)} 件商品的详情...")
# 启动新线程获取详情
threading.Thread(target=self.do_fetch_details, args=(goods_ids,), daemon=True).start()
def do_fetch_details(self, goods_ids):
try:
# 获取最大线程数
try:
max_workers = int(self.thread_spinbox.get())
except:
max_workers = 5
details = self.spider.get_goods_details(goods_ids, max_workers)
if self.crawling_stop:
self.status_var.set("获取详情已停止")
self.is_crawling = False
return
if not details:
self.status_var.set("未获取到商品详情")
self.is_crawling = False
return
# 更新当前数据
for detail in details:
# 查找对应的商品并更新详情
for goods in self.current_data:
if goods['goods_id'] == detail['goods_id']:
goods['detail'] = detail
break
self.status_var.set(f"成功获取 {len(details)} 件商品的详情")
except Exception as e:
self.status_var.set(f"获取详情出错: {str(e)}")
finally:
self.is_crawling = False
def show_detail(self, event):
# 清空详情显示
self.detail_text.config(state=tk.NORMAL)
self.detail_text.delete(1.0, tk.END)
# 获取选中的商品
selected_items = self.tree.selection()
if not selected_items:
return
# 只显示第一个选中的商品
item = selected_items[0]
values = self.tree.item(item, "values")
if not values:
return
goods_id = values[0]
# 查找商品详情
for goods in self.current_data:
if goods['goods_id'] == goods_id:
# 显示详情
detail_text = f"商品ID: {goods_id}\n"
detail_text += f"标题: {goods['title']}\n"
detail_text += f"价格: {goods['price']}\n"
detail_text += f"销量: {goods['sales']}\n"
detail_text += f"店铺: {goods['shop']}\n"
if 'detail' in goods:
detail = goods['detail']
detail_text += "\n===== 商品详情 =====\n"
detail_text += f"原价: {detail.get('origin_price', '')}\n"
detail_text += f"店铺评分: {detail.get('shop_score', '')}\n"
detail_text += f"描述: {detail.get('description', '')}\n"
# 规格信息
if detail.get('specs'):
detail_text += "\n规格:\n"
for spec in detail['specs']:
detail_text += f" {spec['name']}: {spec['value']}\n"
# 优惠券信息
if detail.get('coupons'):
detail_text += "\n优惠券:\n"
for coupon in detail['coupons']:
detail_text += f" 满{coupon['min_order_amount']}减{coupon['discount']}, "
detail_text += f"有效期: {coupon['start_time']}至{coupon['end_time']}\n"
self.detail_text.insert(tk.END, detail_text)
self.detail_text.config(state=tk.DISABLED)
break
def export_data(self):
if not self.current_data:
messagebox.showwarning("警告", "没有可导出的数据")
return
# 检查哪些商品有详情
has_details = any('detail' in goods for goods in self.current_data)
# 选择保存路径
file_path = filedialog.asksaveasfilename(
defaultextension=".xlsx",
filetypes=[("Excel 文件", "*.xlsx"), ("CSV 文件", "*.csv"), ("JSON 文件", "*.json")],
title="保存数据"
)
if not file_path:
return
try:
if file_path.endswith('.json'):
# 导出为JSON
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.current_data, f, ensure_ascii=False, indent=2)
elif file_path.endswith('.csv'):
# 导出为CSV
with open(file_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
# 表头
headers = ['goods_id', 'title', 'price', 'sales', 'shop', 'link']
if has_details:
headers.extend([
'origin_price', 'shop_score', 'description',
'specs', 'coupons', 'images'
])
writer.writerow(headers)
# 数据行
for goods in self.current_data:
row = [
goods.get('goods_id', ''),
goods.get('title', ''),
goods.get('price', ''),
goods.get('sales', ''),
goods.get('shop', ''),
goods.get('link', '')
]
if has_details and 'detail' in goods:
detail = goods['detail']
row.extend([
detail.get('origin_price', ''),
detail.get('shop_score', ''),
detail.get('description', ''),
'; '.join([f"{s['name']}:{s['value']}" for s in detail.get('specs', [])]),
'; '.join([f"满{c['min_order_amount']}减{c['discount']}" for c in detail.get('coupons', [])]),
'; '.join(detail.get('images', []))
])
else:
row.extend([''] * 6)
writer.writerow(row)
else:
# 导出为Excel
data = []
for goods in self.current_data:
item = {
'goods_id': goods.get('goods_id', ''),
'title': goods.get('title', ''),
'price': goods.get('price', ''),
'sales': goods.get('sales', ''),
'shop': goods.get('shop', ''),
'link': goods.get('link', '')
}
if has_details and 'detail' in goods:
detail = goods['detail']
item.update({
'origin_price': detail.get('origin_price', ''),
'shop_score': detail.get('shop_score', ''),
'description': detail.get('description', ''),
'specs': '; '.join([f"{s['name']}:{s['value']}" for s in detail.get('specs', [])]),
'coupons': '; '.join([f"满{c['min_order_amount']}减{c['discount']}" for c in detail.get('coupons', [])]),
'images': '; '.join(detail.get('images', []))
})
else:
item.update({
'origin_price': '',
'shop_score': '',
'description': '',
'specs': '',
'coupons': '',
'images': ''
})
data.append(item)
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)
self.status_var.set(f"数据已成功导出到: {file_path}")
messagebox.showinfo("成功", f"数据已成功导出到:\n{file_path}")
except Exception as e:
messagebox.showerror("导出失败", f"导出数据时出错:\n{str(e)}")
def clear_data(self):
self.tree.delete(*self.tree.get_children())
self.detail_text.config(state=tk.NORMAL)
self.detail_text.delete(1.0, tk.END)
self.detail_text.config(state=tk.DISABLED)
self.current_data = []
self.status_var.set("数据已清空")
def stop_crawling(self):
if self.is_crawling:
self.crawling_stop = True
self.status_var.set("正在停止采集...")
else:
self.status_var.set("当前没有进行中的采集任务")
if __name__ == "__main__":
root = tk.Tk()
app = PinduoduoCrawlerApp(root)
root.mainloop()
系统功能说明
这个拼多多数据采集系统具有以下功能:
-
商品列表数据采集:
-
支持关键词搜索
-
可设置爬取页数
-
显示商品ID、标题、价格、销量和店铺信息
-
-
商品详情数据采集:
-
获取商品原价、描述、规格等详细信息
-
多线程采集提高效率
-
支持选择特定商品获取详情
-
-
商品优惠券数据采集:
-
提取商品相关的优惠券信息
-
显示优惠券金额、使用条件和有效期
-
-
数据导出功能:
-
支持导出为Excel、CSV或JSON格式
-
导出包含商品列表、详情和优惠券的完整数据
-
-
图形用户界面:
-
直观的操作界面
-
实时显示采集状态
-
商品详情预览功能
-
使用说明
-
在搜索框中输入关键词(如"手机")
-
设置需要爬取的页数(默认2页)
-
点击"开始搜索"按钮获取商品列表
-
选择商品后点击"获取详情数据"提取详细信息
-
点击"导出数据"将结果保存为文件
技术特点
-
反反爬策略:
-
使用随机User-Agent
-
支持代理IP(需自行配置)
-
随机延迟请求
-
-
多线程采集:
-
使用ThreadPoolExecutor实现并发
-
可自定义线程数量
-
-
模块化设计:
-
爬虫逻辑与GUI分离
-
易于维护和扩展
-
-
数据完整性:
-
同时采集商品列表、详情和优惠券数据
-
多种导出格式支持
-
注意事项
-
实际使用时需要配置有效的代理IP(在load_proxies方法中)
-
拼多多网页结构可能变化,需要定期更新解析逻辑
-
请合理控制爬取速度,避免对目标网站造成过大压力
更多推荐


所有评论(0)