-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathscrapy_pagestorage.py
149 lines (128 loc) · 5.34 KB
/
scrapy_pagestorage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Middleware for implementing visited pages storage using hubstorage
"""
import logging
import os
from scrapinghub.hubstorage import ValueTooLarge
from scrapinghub.hubstorage.utils import urlpathjoin
from scrapy.exceptions import NotConfigured, IgnoreRequest
from scrapy.utils.request import request_fingerprint
from scrapy.http import TextResponse
from scrapy import signals
from scrapy.item import Field
try:
from cgi import parse_qsl
except ImportError:
from urllib.parse import parse_qsl
try:
from scrapy.item import DictItem as ScrapyItem
except ImportError:
from scrapy.item import Item as ScrapyItem
logger = logging.getLogger(__name__)
_COLLECTION_NAME = "Pages"
class PageStorageMiddleware:
@classmethod
def from_crawler(cls, crawler):
enabled, on_error_enabled = _get_enabled_status(crawler.settings)
if enabled or on_error_enabled:
return cls(crawler)
raise NotConfigured
def __init__(self, crawler):
# FIXME move sh_scrapy.hsref to python-hubstorage and drop it
try:
from sh_scrapy.hsref import hsref
self.hsref = hsref
except ImportError:
raise NotConfigured
settings = crawler.settings
mode = 'cs'
if settings.get('PAGE_STORAGE_MODE') == 'VERSIONED_CACHE':
mode = 'vcs'
self.trim_html = False
if settings.getbool('PAGE_STORAGE_TRIM_HTML'):
self.trim_html = True
self.enabled, self.on_error_enabled = _get_enabled_status(settings)
self.limits = {
'all': crawler.settings.getint('PAGE_STORAGE_LIMIT'),
'error': crawler.settings.getint('PAGE_STORAGE_ON_ERROR_LIMIT'),
}
self.counters = {
'all': 0,
'error': 0,
}
self.cookies_seen = set()
endpoint = urlpathjoin(hsref.project.collections.url,
mode, _COLLECTION_NAME)
logger.info("HubStorage: writing pages to %s", endpoint)
hsref.job.metadata.apipost('collection',
jl=urlpathjoin(mode, _COLLECTION_NAME))
self._writer = hsref.client.batchuploader.create_writer(
endpoint, content_encoding='gzip', size=20)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
def spider_closed(self, spider):
self._writer.close()
def process_spider_input(self, response, spider):
if self.enabled and (self.counters['all'] < self.limits['all']):
self.counters['all'] += 1
self.save_response(response, spider)
def process_spider_exception(self, response, exception, spider):
if (self.on_error_enabled and
not isinstance(exception, IgnoreRequest) and
self.counters['error'] < self.limits['error']):
self.counters['error'] += 1
self.save_response(response, spider)
def save_response(self, response, spider):
if isinstance(response, TextResponse):
fp = request_fingerprint(response.request)
payload = {
"_key": fp,
"_jobid": self.hsref.job.key,
"_type": "_pageitem",
"_encoding": response.encoding,
"url": response.url,
}
self._set_cookies(payload, response)
if response.request.method == 'POST':
payload["postdata"] = dict(parse_qsl(response.request.body.decode()))
try:
payload["body"] = response.text
except AttributeError:
payload["body"] = response.body_as_unicode()
if self.trim_html:
payload['body'] = payload['body'].strip(' \r\n\0')
if len(payload['body']) > self._writer.maxitemsize:
spider.logger.warning("Page not saved, body too large: <%s>" %
response.url)
return
try:
self._writer.write(payload)
except ValueTooLarge as exc:
spider.logger.warning("Page not saved, %s: <%s>" %
(exc, response.url))
def process_spider_output(self, response, result, spider):
fp = request_fingerprint(response.request)
try:
for r in result:
if isinstance(r, ScrapyItem):
r.fields["_cached_page_id"] = Field()
r._values["_cached_page_id"] = fp
elif isinstance(r, dict):
r["_cached_page_id"] = fp
yield r
except Exception as exc:
self.process_spider_exception(response, exc, spider)
raise
def _set_cookies(self, payload, response):
cookies = []
for cookie in [x.split(b';', 1)[0].decode('ISO-8859-1')
for x in response.headers.getlist('Set-Cookie')]:
if cookie not in self.cookies_seen:
self.cookies_seen.add(cookie)
cookies.append(cookie)
if cookies:
payload["cookies"] = cookies
def _get_enabled_status(settings):
enabled = settings.getbool('PAGE_STORAGE_ENABLED')
autospider = (os.environ.get('SHUB_SPIDER_TYPE') in ('auto', 'portia'))
on_error_enabled = settings.getbool('PAGE_STORAGE_ON_ERROR_ENABLED')
return (enabled or autospider), on_error_enabled