current position:Home>Van * Python | simple crawling of a planet

Van * Python | simple crawling of a planet

2022-01-30 22:39:48 coder_ pig

This is my participation 11 The fourth of the yuegengwen challenge 1 God , Check out the activity details :2021 One last more challenge

0x1、 introduction

Similarly, , Catch up section 《Van * Python | Simple crawling of a site course 》 equally , The reason for crawling is that the paid service is about to expire ( The root cause is poverty T﹏T)

The technical solution is also : Dot + Grab the bag , In this section, try the automation artifact that you want to use for a long time → Pyppeteer

solemn statement

This article is only used to record the research and learning of crawler technology , The script will not be provided directly , The crawled data has been deleted and not propagated . Do not use for illegal purposes , If there are other illegal uses causing losses , Not relevant to this article .


0x2、Pyppeteer Fast track

1、 And Puppeteer Origin ?

Puppeteer yes Google Officially produced by DevTools Protocol port control Headless Chrome Of NodeJS library , Through the API Direct control Chrome Simulate most user operations , To carry out UI Test、 The crawler visits the page to collect data .Pyppeter It can be understood as puppeteer Of python edition .

2、 And Selenium comparison ?

And Selenium Coop ,Pyppeteer No need for tedious environment configuration , At the first run, it will be detected whether it is in accordance with Chromium, The uninstalled program will help us automatically install and configure . and Pyppeteer be based on Python New features async Realization (Python 3.5 above ), Therefore, some of its execution also supports asynchronous operation , In contrast, the efficiency has been improved a lot .

3、API file

4、Puppeteer Architecture diagram

sketch ( Just know , Don't have to remember )

  • Puppeteer: adopt DevTools agreement Communicate with the browser ;
  • Browser: Can hold browser context ;
  • BrowserContext: Defines a browser session , And can have multiple pages ;
  • Page: At least one framework , Main frame ;
  • Frame: At least one execution context , Default execution context ( Framework of the JavaScript) Be performed ;
  • Worker: With a single execution context , Cut to facilitate and WebWorkers Interact ;

5、Pyppeteer install

  • Step 1pip install pyppeteer
pip install pyppeteer
 Copy code 
  • Step 2 install chromium

Write a note casually pyppeteer Simple program of Library , Run , Will automatically download , For example, the script that generates the screenshot of the Nuggets home page :

import asyncio
from pyppeteer import launch


async def screen_shot():
    browser = await launch()
    page = await browser.newPage()
    await page.goto('https://juejin.cn/')
    await page.screenshot({'path': 'juejin.jpg'})
    await browser.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(screen_shot())
 Copy code 

Of course , Gu Geyuan , The download may not be smooth , Sometimes it may get stuck , You can use Taobao source , Download the compressed package to decompress , And then in launch() When you specify executablePath, The method is as follows :

# ①  Get the original download address 
from pyppeteer import chromium_downloader

#  Replace according to the system version :win32,win64,linux,mac
print(chromium_downloader.downloadURLs.get("win64"))

#  Run example :
# https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip

# ② storage.googleapis.com Replace with Taobao source npm.taobao.org/mirrors, Such as :
https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip

#  You can also enter the site and choose by yourself :
https://npm.taobao.org/mirrors/chromium-browser-snapshots

# 3、launch When you specify userDataDir
await launch({'headless': headless,'args': launch_args, 'executablePath': './userData')
 Copy code 

attach Code flow analysis

async #  Declare an asynchronous operation 
await #  Declare a time-consuming operation 

#  Create an asynchronous pool and execute screen_shot() function 
asyncio.get_event_loop().run_until_complete(screen_shot()) 

#  Create a browser object , Parameters of dictionary type can be passed in 
browser = await launch() 

#  Create a page object , Page operations are performed on this object 
page = await browser.newPage() 

await page.goto('https://juejin.cn/') #  Page Jump 
await page.screenshot({'path': 'juejin.jpg'}) #  Screenshot save 
await browser.close() #  Close the browser object 
 Copy code 

About API That's all I know , For more information, please refer to the official documents , Or make good use of search engines , Then analyze the down climbing process .


0x3、 Data crawling

① Visit the landing page

Visit the landing page : Not logged in → The following login panel will be displayed , Logged in → Automatically jump to the home page .

Crawling process

  • Request landing page , Judge whether there is a login QR code node , If so, sleep 10s Waiting for scan code login ;
  • No QR code , Instructions go to the home page ;
  • Both of the above steps enter packet acquisition ;

② Group access

The left panel can see : establish / Managed planet And Join the planet

F12 Look at the node :

Crawling process

  • Locate these two nodes through the selector , Get all Planet Names and links , Output for the user to select the planet you want to climb ;
  • Selectors d
  • adopt selector The selector locates to two nodes , Get all the planet names and links , Output for users to choose the planet they want to climb ;
  • Attached selector example :div.created-group > div:nth-child(2) > a

③ Content crawling

At first I just wanted to climb down The essence of Classified , Some planets found later may have no data like this :

Just climb all of them , Pagination of content list , Scroll to the bottom and load more ,Ajax No doubt , Without trying to crack the interface rules , The simplest way to get data is : Simulation scrolling + Resolution node In the form of .

But in this scenario , The efficiency of parsing nodes is too low , label + Image & Text + There are too many matching styles of links , You need to write a lot of parsing rules , And use How to intercept specific requests It's more brainless and efficient .

Then look at this ajax Characteristics of the request , Such as composition url, Wait, the filtering request uses , open Network tab , Empty , Choose XHR, Scroll the page to the bottom , Look at the loaded request :

Open it and have a look , Make sure it's the data you need , Such a request was intercepted , Save the data locally , Finally, batch processing will be carried out uniformly .

④ Two ideas for determining when scrolling stops

Keep rolling down , You need to determine when the data will finish crawling , Stop scrolling , Say the writer's two ideas :

  • Method 1 Dead cycle + asyncio.sleep() + pyppeteer Find the bottom node

It's a dead cycle , Always check whether the bottom node is visible , So the site slides to the bottom :

<div _ngcontent-isv-c98="" class="no-more"> There is no more </div>
 Copy code 
  • Method 2 js Timer + Judgment of sliding distance and height

Is to start a timer , Record the scroll distance and the current page height , Compare the former >= When the latter , It's possible to slide to the bottom .

Yes , It's possible , Because there is no list load The situation of , So you can add a number of retries , When the number of retries reaches the threshold, it is considered complete .

Tips: The second method used by the author , Think so js I don't understand grammar , I don't know how to pause and start a timer , So set the retry threshold very high , It can also be regarded as indirect sleep .

⑤ Initialize browser

The process is clear , Then start writing code to crawl , First initialize the browser :

import asyncio
import os
import time
from pyppeteer import launch

import cp_utils

#  Start configuration parameters 
launch_args = [
    "--no-sandbox",  #  Non sandbox mode 
    "--disable-infobars",  #  Hide information bar 
    #  Set up UA
    "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/83.0.4103.97 Safari/537.36 ",
    "--log-level=3",  #  The log level 
]

#  Launch the browser 
async def init_browser(headless=False):
    return await launch({'headless': headless,
                         'args': launch_args,
                         'userDataDir': './userData',
                         'dumpio': True,
                         'ignoreHTTPSErrors ': True})
 Copy code 

⑥ New page

Then passed browser.newPage() To create a new browser page , In addition to the general settings , Also add anti WebDrivder testing ~

#  New page 
async def init_page(browser):
    page = await browser.newPage()
    await page.setViewport({'width': 1960, 'height': 1080}) #  Set the width and height of the page 
    await page.setJavaScriptEnabled(True)
    await prevent_web_driver_check(page)
    return page


#  prevent WebDriver testing 
async def prevent_web_driver_check(page):
    if page is not None:
        #  hide webDriver features 
        await page.evaluateOnNewDocument("""() => { Object.defineProperty(navigator, 'webdriver', { get: () => undefined })} """)
        #  Some sites call... To detect browsers js Modify the result 
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'lang uages', { get: () => ['en-US', 'en'] }); }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
 Copy code 

⑦ Log in and pull the list of planets

utilize page.waitForSelector() Set timeout , Check whether there is a login QR code node , Follow up logic :

#  Sign in 
async def login(page, timeout=60):
    await page.goto(login_url, options={'timeout': int(timeout * 1000)})
    try:
        await page.waitForSelector('img.qrcode', {'visible': 'visible', 'timeout': 3000})
        #  Waiting for scan code login 
        print(" Not logged in detected , Waiting for scan code login ...")
        await asyncio.sleep(10)
        await fetch_group(page)
    except errors.TimeoutError:
        print(" Detected logged in , Directly pull the list ...")
        await fetch_group(page)

       
#  Extract group 
async def fetch_group(page):
    global choose_group_id, choose_group_name
    #  Get all groups 
    group_list = []
    created_groups = await page.JJ('div.created-group > div:nth-child(2) > a')
    joined_groups = await page.JJ('div.joined-group > div:nth-child(2) > a')
    for item in created_groups + joined_groups:
        group_name = await page.evaluate('item => item.textContent', item)
        group_url = await (await item.getProperty('href')).jsonValue()
        group_list.append([group_name.strip(), group_url])
    print(" The list of planets detected is as follows :")
    for index, group in enumerate(group_list):
        print(index, '、', group)
    choose_group_index = input(" Please enter the group number to be crawled  ( notes : Subscript from 0 Start )")
    choose_group = group_list[int(choose_group_index)]
    choose_group_id = choose_group[1].split('/')[-1]
    choose_group_name = choose_group[0]
    await fetch_data(page, choose_group[1])
 Copy code 

The operation results are as follows

⑧ Intercept request 、 Response and data storage

#  Intercept request 
async def intercept_request(req):
    #  Prohibit getting pictures 、 Multimedia resources and initiatives websocket request 
    if req.resourceType in ['image', 'media', 'eventsource', 'websocket']:
        await req.abort()
    else:
        await req.continue_()


#  Interception response 
async def intercept_response(resp):
    resp_type = resp.request.resourceType
    if resp_type in ['xhr'] and 'https://xxx/v2/groups/{}/topics?scope=all&count=20'.format(
            choose_group_id) in resp.url:
        content = await resp.text()
        if len(content) > 0:
            temp_dir = os.path.join(content_save_dir, choose_group_name)
            cp_utils.is_dir_existed(temp_dir)
            content = await resp.text()
            print(resp.url + ' → ' + content)
            json_save_path = os.path.join(temp_dir, str(int(time.time() * 1000)) + '.json')
            cp_utils.write_str_data(content, json_save_path)
            print(" Save the file :", json_save_path)
    return resp
 Copy code 

⑨ Infinite scrolling

#  Pull content 
async def fetch_data(page, url, timeout=60):
    #  Intercept requests and grab 
    await page.setRequestInterception(True)
    page.on('request', lambda req: asyncio.ensure_future(intercept_request(req)))
    page.on('response', lambda resp: asyncio.ensure_future(intercept_response(resp)))
    print(" Start crawling :", choose_group_name)
    await page.goto(url, options={'timeout': int(timeout * 1000)})
    #  Sleep 3 Seconds waiting to load 
    await asyncio.sleep(3)
    #  Keep rolling down 
    await page.evaluate('''async () => {
        await new Promise(((resolve, reject) => {
            //  Each sliding distance 
            const distance = 100;

            //  Current altitude 
            var totalHeight = 0;

            //  Maximum number of retries and current number of retries 
            var maxTries = 20000;
            var curTries = 0;

            var timer = setInterval(() => {
                var scrollHeight = document.body.scrollHeight;
                window.scrollBy(0, distance)
                totalHeight += distance
                console.log(totalHeight + "-" + scrollHeight)
                if (totalHeight >= scrollHeight) {
                    if(curTries > maxTries) {
                        clearInterval(timer)
                        resolve();
                    } else {
                        curTries += 1;
                        totalHeight -= distance
                    }
                } else {
                    curTries = 0;
                }
            }, 100)
        }));
    }''')
    print(" star 【{}】 Data crawling completed ...".format(choose_group_name))
    #  Cancel blocking 
    await page.setRequestInterception(False)
 Copy code 

Last call :

if __name__ == '__main__':
    cur_browser = asyncio.get_event_loop().run_until_complete(init_browser())
    cur_page = asyncio.get_event_loop().run_until_complete(init_page(cur_browser))
    asyncio.get_event_loop().run_until_complete(login(cur_page))
 Copy code 

After running, you can see that the console outputs the corresponding crawling information :

You can also see crawling to the local json file :

Youxi , The data is saved locally , Then to the data processing link ~


0x4、 Data processing

① Key data extraction

Open a few crawling json sample , It looks good json The key part of :

Define the extraction entity class :

class Talk:
    def __init__(self, name=None, text=None, images=None, files=None):
        self.name = name
        self.text = text
        self.images = images
        self.files = files
 Copy code 

The next step is to traverse the file ,json.loads Turn into dict, On demand field , It's simple :

import cp_utils
import json
import os

zsxq_save_dir = os.path.join(os.getcwd(), "zsxq")
result_json_path = os.path.join(os.getcwd(), "zsxq_result.json")
talk_list = []
talk_dict = {'data': None}


#  Data entity 
class Talk:
    def __init__(self, name=None, text=None, images=None, files=None):
        self.name = name
        self.text = text
        self.images = images
        self.files = files

    def to_json_str(self):
        return json.dumps({'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files},
                          ensure_ascii=False)

    def to_dict(self):
        return {'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files}


#  extract json The contents of the document 
def extract_json_file(file_path):
    global talk_list
    content = cp_utils.read_content_from_file(file_path)
    content_dict = json.loads(content)
    topics = content_dict['resp_data'].get('topics')
    print(" Parsing files :{}".format(file_path))
    if topics is not None and len(topics) > 0:
        for topic in topics:
            talk_entity = Talk()
            talk = topic.get('talk')
            if talk is not None:
                #  Get the names in turn 、 Text 、 picture 、 file 
                owner = talk.get('owner')
                if owner is not None:
                    owner_name = owner.get("name")
                    if owner is not None:
                        talk_entity.name = owner_name
                text = talk.get('text')
                if text is not None:
                    talk_entity.text = text
                images = talk.get('images')
                if images is not None and len(images) > 0:
                    image_urls = []
                    for image in images:
                        original = image.get('original')
                        if original is not None:
                            image_urls.append(original.get('url'))
                    talk_entity.images = image_urls
                files = talk.get('files')
                if files is not None and len(files) > 0:
                    file_list = []
                    for file in files:
                        file_id = file.get('file_id')
                        file_name = file.get('name')
                        file_list.append({file_id: file_name})
                    talk_entity.files = file_list
            talk_list.append(talk_entity.to_dict())
    else:
        print(" Data is empty , Skip file ...")


if __name__ == '__main__':
    dir_list = cp_utils.fetch_all_file(zsxq_save_dir)
    print(" Operational directory :\n")
    for index, path in enumerate(dir_list):
        print("{}、{}".format(index, path))
    choose_index = input("\n Please enter the serial number of the directory to be processed  => ")
    choose_path = dir_list[int(choose_index)]
    print(" The currently selected directory :{}".format(choose_path))
    json_file_list = cp_utils.filter_file_type(choose_path, '.json')
    for json_file in json_file_list[:10]:
        extract_json_file(json_file)
    talk_dict['data'] = talk_list
    talk_json = json.dumps(talk_dict, ensure_ascii=False, indent=2)
    cp_utils.write_str_data(talk_json, result_json_path)
    print(" File write complete :{}".format(result_json_path))
 Copy code 

Traverse 10 Try a file :

open json File check :

② take json Turn into Markdown

json Definitely not and easy to read , Can generate a wave Markdown, Just splicing strings , The main difficulty here is :

text Parsing , There are labels 、 Plain text 、 External links 、 expression ...

Can pass re.sub() + backreferences Replace a wave of tags and external links , Write a test code and try water :

    #  Replace regular 
    hash_tag_pattern = re.compile(r'(<e type="hashtag" .*? title=")(.*?)(".*?/>)')
    web_pattern = re.compile(r'(<e type="web" href=")(.*?)(" title=")(.*?)(" cache=.*?/>)')
   
    #  The test case 
    xml_str = """ <e type="hashtag" hid="51288155841824" title="%23%E6%8A%80%E5%B7%A7%23"/> <e type="hashtag" hid="28518452544211" title="%23%E6%95%88%E7%8E%87%E5%B7%A5%E5%85%B7%23"/>  Today I recommend a command line helper :fig, A picture is worth a thousand words , Look at the picture directly   Open the official website to install : <e type="web" href="https%3A%2F%2Ffig.io%2Fwelcome" title="Fig+%7C+Welcome+to+Fig" cache=""/> """
    temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", xml_str), 'utf-8')
    temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
    temp_result = temp_result.strip().replace("\n", "")
    print(temp_result)
 Copy code 

Take a look at the analysis results

Good, Then complete the pictures and documents :

#  convert to md file 
def json_to_md(file_path):
    content = cp_utils.read_content_from_file(file_path)
    data_list = json.loads(content)['data']
    md_content = ''
    for data in data_list:
        name = data['name']
        if name is not None:
            md_content += name + "\n"
        text = data['text']
        if text is not None:
            temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", text), 'utf-8').replace("#", "`")
            temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
            md_content += temp_result.strip()
        images = data['images']
        if images is not None:
            md_content += '\n'
            for image_url in images:
                img_file_name = str(int(time.time() * 1000)) + ".jpg"
                img_save_path = os.path.join(image_save_dir, str(int(time.time() * 1000)) + ".jpg")
                cp_utils.download_pic(img_save_path, image_url)
                relative_path = 'images/{}'.format(img_file_name)
                md_content += '![]({})'.format(relative_path)
        files = data['files']
        if files is not None:
            md_content += '\n file :'
            for file in files:
                file_id = file.get('file_id')
                file_name = file.get('name')
                md_content += "《{}》".format(file_name)
        md_content += '\n\n---\n\n'
    cp_utils.write_str_data(md_content, result_md_path)
 Copy code 

Careful you may have found , Give the picture to download down , It doesn't use remote pictures , The reason is that the site image resources url, No picture suffix ,Markdown Grammar is not recognized , As a result, the picture will not be displayed during preview . Generated md file :

46257 Characters ,PyCharm Open the preview and get stuck directly ,MarkdwonPad2 It's hard to escape , Roll it, card it , It's still necessary to divide into several md File in ~


0x5、 Summary

This article takes the opportunity of climbing a planet , hold Pyppeteer After a wave , Reptile skills Level Up↑, in addition , There is only one file class here id, The real download address has to call another interface to get , And you have to be logged in , Interested students can try a wave of .

well , That's all , If you have any questions, please point out that , thank ~


reference

copyright notice
author[coder_ pig],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/01/202201302239277041.html

Random recommended