Асинхронное программирование. Почему не парсит все ссылки? Здравствуйте!
Задача: Спарсить 78 файлов, в каждом из которых по 50000 ссылок.
Проблема: Переменная total выводит 700-1500, от сюда следует, что выполняются запросы не по всем ссылкам , хотя прокси раздается для каждой. И как заморозить программу при ошибке 429 и 503? Использовал time.sleep() и asyncio.sleep(). Но не то, не другое не сработало.fields = {
'B1': 'Полное наименование Заявителя',
'C1': 'Адрес',
'D1': 'ОГРН',
'E1': 'Должность руководителя',
'F1': 'ФИО',
'G1': 'Телефон',
'H1': 'Эл. Почта',
'I1': 'Рег. номер декларации',
'J1': 'Дата начала действия',
'K1': 'Дата окончания',
'L1': 'Схема',
'M1': 'Статус',
'N1': 'Сведения о документах, на основании которых изготовлена продукция',
'O1': 'ТНВЭД'
}
def get_data(page):
pass
async def get(url, session, proxy):
global total
try:
async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp:
total += 1
if resp.status == 200:
return await resp.text()
else:
write_to_file('bad.txt', url)
'''if resp.status == 429:
print('[429] waiting for 30 seconds')
await asyncio.sleep(time_error429)
if resp.status == 503:
print('[503] waiting for 2 minutes')
await asyncio.sleep(time_error503)'''
except Exception as e:
#print('bad proxy', e)
pass
total = 0
async def save_data_from(url, filename, session, proxy):
try:
async with asyncio.Semaphore(semaphore):
try:
page = await get(url, session, proxy)
data = get_data(page)
if data:
text = ''
for f in sorted(fields.values()):
text += f + ':'
text += data[f] + ' '
print(text)
write_to_excel(data)
except Exception as e:
#print('save_data_from', e)
pass
except:
pass
async def run():
names = get_names_of('pages/')
it = iter(names)
cur = 'pages/{}'.format(next(it))
tasks = []
urls = open(cur, encoding='utf-8').readlines()
async with ClientSession(headers=headers) as session:
for i, url in enumerate(urls):
if i % 10:
proxy = proxy_local.get_proxy()
task = asyncio.ensure_future(save_data_from(url, cur, session, proxy))
tasks.append(task)
end_of_file = url
#remove_if_done(cur, end_of_file)
return await asyncio.gather(*tasks)
if __name__ == '__main__':
proxy_local = Proxy()
user_agent = UserAgent()
save = Save()
semaphore, time_error429, time_error503 = read_conf()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print('total',total)

21 Авг 2019 в 06:23
285 +1
0
Ответы
1

Проблема может быть связана с тем, что у вас используется совместное использование переменной total в нескольких асинхронных функциях, что может привести к непредсказуемым результатам. Чтобы избежать этого, рекомендуется использовать Asyncio Lock для защиты критических областей кода, включая изменение общих переменных.

Пример:

async def get(url, session, proxy):
global total
try:
async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp:
async with total_lock:
total += 1
if resp.status == 200:
return await resp.text()
else:
write_to_file('bad.txt', url)
except Exception as e:
pass
total = 0
total_lock = asyncio.Lock()
async def save_data_from(url, filename, session, proxy):
try:
async with asyncio.Semaphore(semaphore):
try:
page = await get(url, session, proxy)
data = get_data(page)
if data:
text = ''
for f in sorted(fields.values()):
text += f + ':'
text += data[f] + ' '
print(text)
write_to_excel(data)
except Exception as e:
pass
except:
pass
async def run():
names = get_names_of('pages/')
it = iter(names)
cur = 'pages/{}'.format(next(it))
tasks = []
urls = open(cur, encoding='utf-8').readlines()
async with ClientSession(headers=headers) as session:
for i, url in enumerate(urls):
if i % 10:
proxy = proxy_local.get_proxy()
task = asyncio.ensure_future(save_data_from(url, cur, session, proxy))
tasks.append(task)
end_of_file = url
return await asyncio.gather(*tasks)
if __name__ == '__main__':
proxy_local = Proxy()
user_agent = UserAgent()
save = Save()
semaphore, time_error429, time_error503 = read_conf()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print('total', total)

Относительно замораживания программы при ошибках 429 и 503, можно попробовать использовать Retry-After заголовок ответа для 429 ошибки и задержку повторной попытки соединения, а для 503 ошибки попробовать использовать повторный запрос с увеличенной задержкой.

if resp.status == 429:
if 'Retry-After' in resp.headers:
delay = int(resp.headers['Retry-After'])
else:
delay = time_error429
print('[429] waiting for {} seconds'.format(delay))
await asyncio.sleep(delay)if resp.status == 503:
print('[503] waiting for {} seconds'.format(time_error503))
await asyncio.sleep(time_error503)

Эти стратегии позволят вашей программе корректно обрабатывать ошибки 429 и 503 и продолжать парсинг после задержки.

20 Апр 2024 в 13:19
Не можешь разобраться в этой теме?
Обратись за помощью к экспертам
Гарантированные бесплатные доработки в течение 1 года
Быстрое выполнение от 2 часов
Проверка работы на плагиат
Поможем написать учебную работу
Прямой эфир