Skip to content

Commit 977e6d8

Browse files
committed
fixed BaseWorker.load_items(), added broker test for rabbit.
1 parent 936cbbb commit 977e6d8

File tree

23 files changed

+384
-118
lines changed

23 files changed

+384
-118
lines changed

CHANGES

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
transistor: CHANGES
33
========================
44

5+
12/3/18
6+
- Fixed a bug in BaseWorker.load_items() method which previously resulted
7+
in losing scrape data when the number of workers did not equal number of
8+
tasks. Now, using any number of workers or pool size will result in
9+
consistent export/save results. While, scrape time will change proportional
10+
to the number of workers assigned. Wrote tests to ensure the same.
11+
512
11/30/18
613
- BaseGroup.hired_worker() method name has been changed to `get_worker()`
714

README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Development of Transistor is sponsored by `BOM Quote Manufacturing <https://www.
5454
4. Easily integrate within a web app like `Flask <https://github.com/pallets/flask>`_, `Django <https://github.com/django/django>`_ , or other python based `web frameworks <https://github.com/vinta/awesome-python#web-frameworks>`_.
5555
5. Provide spreadsheet based data ingest and export options, like import a list of search terms from excel, ods, csv, and export data to each as well.
5656
6. Utilize quick and easy integrated task work queues which can be automatically filled with data search terms by a simple spreadsheet import.
57-
7. Able to integrate with more robust task queues like `Celery <https://github.com/celery/celery>`_ while using `rabbitmq <https://www.rabbitmq.com/>`_ or `redis <https://redis.io/>`_ as needed.
57+
7. Able to integrate with more robust task queues like `Celery <https://github.com/celery/celery>`_ while using `rabbitmq <https://www.rabbitmq.com/>`_ or `redis <https://redis.io/>`_ as a message broker as desired.
5858
8. Provide hooks for users to persist data via any method they choose, while also supporting our own opinionated choice which is a `PostgreSQL <https://www.postgresql.org/>`_ database along with `newt.db <https://github.com/newtdb/db>`_.
5959
9. Contain useful abstractions, classes, and interfaces for scraping and crawling with machine learning assistance (wip, timeline tbd).
6060
10. Further support data science use cases of the persisted data, where convenient and useful for us to provide in this library (wip, timeline tbd).
@@ -485,6 +485,7 @@ Transistor provides useful layers and objects in the following categories:
485485

486486
- see ``transistor/schedulers/brokers``
487487
- provides the ``ExchangeQueue`` class in transistor.scheulers.brokers.queues which can be passed to the ``tasks`` parameter of ``BaseWorkGroupManager``
488+
- Just pass the appropriate connection string to ``ExchangeQueue`` and ``BaseWorkGroupManager`` and you can use either RabbitMQ or Redis as a message broker, thanks to `kombu <https://github.com/celery/kombu>`_.
488489
- in this case, the ``BaseWorkGroupManager`` also acts as a AMQP ``consumer`` which can receive messages from RabbitMQ message broker
489490

490491

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ install:
120120
- cmd: pip install pytest==4.0.1
121121
- cmd: pip install pytest-cov==2.6.0
122122
- cmd: pip install coverage==4.5.2
123+
- cmd: pip install cssselect==1.0.3
123124
- cmd: pip install mock==2.0.0
124125
- cmd: pip install gevent==1.3.7
125126
- cmd: pip install newt.db==0.9.0

appveyor/rabbitmq.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# """
22
# transistor.appveyor.rabbitmq
33
# ~~~~~~~~~~~~
4-
# This is a powershell script to install rabbitmq in a windows server environment.
4+
# This is a powershell script to install rabbitmq on Windows.
55
# It is included here to facilitate various test cases in appveyor CI.
66
#
77
# :copyright: Copyright (C) 2018 by BOM Quote Limited

appveyor/redis.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# """
22
# transistor.appveyor.redis
33
# ~~~~~~~~~~~~
4-
# This is a powershell script to install redis in a windows server environment.
4+
# This is a powershell script to install Redis on Windows.
55
# It is included here to facilitate various test cases in appveyor CI.
66
#
77
# :copyright: Copyright (C) 2018 by BOM Quote Limited

examples/books_to_scrape/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"""
1313
import gevent
1414
from transistor import BaseWorkGroupManager
15-
15+
from transistor.utility.logging import logger
1616

1717
class BooksWorkGroupManager(BaseWorkGroupManager):
1818
"""
@@ -36,7 +36,7 @@ def monitor(self, target):
3636
:param target: the target parameter here is a <Worker()> class object and
3737
you must call target.spawn_scraper() to start the Worker.
3838
"""
39-
print(f'spawning {target}')
39+
logger.info(f'spawning {target}')
4040
target.spawn_spider() # this must be called. It is, required.
4141
# Calling spawn_scraper() above instructs the Worker object to start
4242
# the scrape.So there will be some wait period at this point for each
@@ -49,7 +49,7 @@ def monitor(self, target):
4949
# here, event represents returned scraper objects which the worker has
5050
# completed. We can iterate through the event objects and, for example,
5151
# apply some data transformation, delete failed scrapes, or save data
52-
print(f'THIS IS A MONITOR EVENT - > {event}')
52+
logger.info(f'THIS IS A MONITOR EVENT - > {event}')
5353
# This last line is required, ensure the below gevent.sleep(0) remains.
5454
gevent.sleep(0)
5555

examples/books_to_scrape/schedulers/brokers/client_main.py

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,92 @@
55
This module implements a client producer for testing
66
and example.
77
8+
To run this example, first run:
9+
10+
>>> python client_worker.py
11+
12+
This will start the worker and await the task. Then, in a separate
13+
command prompt, to simulate a message sent to the broker queue, run:
14+
15+
>>> python client_main.py
16+
17+
The result should be the worker will process the `keywords` tasks.
18+
819
:copyright: Copyright (C) 2018 by BOM Quote Limited
920
:license: The MIT License, see LICENSE for more details.
1021
~~~~~~~~~~~~
1122
"""
12-
23+
import time
24+
from kombu import Connection
1325
from kombu.pools import producers
14-
from examples.books_to_scrape.schedulers.brokers.worker_main import tasks
26+
from transistor.schedulers.brokers.queues import ExchangeQueue
27+
from transistor.utility.logging import logger
28+
# from examples.books_to_scrape.schedulers.brokers.worker_main import tasks
1529

16-
task_exchange = tasks.task_exchange
30+
trackers = ['books.toscrape.com']
31+
tasks = ExchangeQueue(trackers)
32+
connection = Connection("pyamqp://guest:guest@localhost:5672//")
1733

34+
def _publish(producer, payload, routing_key, exchange):
35+
"""
1836
19-
def send_as_task(connection, keywords, kwargs={}):
37+
:param producer: example ->
38+
>>> with producers[connection].acquire(block=True) as producer:
39+
:param payload: example ->
40+
>>> payload = {'keywords': keywords, 'kwargs': kwargs}
41+
:param routing_key: Type[str]: 'books.toscrape.com'
42+
:param exchange: a kombu Type[Exchange] class object
43+
:return:
44+
"""
45+
producer.publish(payload,
46+
serializer='json',
47+
exchange=exchange,
48+
routing_key=routing_key,
49+
declare=[exchange],
50+
retry=True,
51+
retry_policy={
52+
'interval_start': 0, # First retry immediately,
53+
'interval_step': 2, # then increase by 2s for every retry.
54+
'interval_max': 5, # don't exceed 5s between retries.
55+
'max_retries': 3, # give up after 3 tries.
56+
})
57+
58+
59+
def send_as_task(connection, keywords, routing_key, exchange, kwargs={}):
2060
payload = {'keywords': keywords, 'kwargs': kwargs}
2161

2262
with producers[connection].acquire(block=True) as producer:
63+
# for tracker in tasks.trackers:
64+
# publish(producer=producer, payload=payload, routing_key=tracker)
2365
producer.publish(payload,
2466
serializer='json',
25-
exchange=task_exchange,
26-
declare=[task_exchange])
67+
# if there is more than one tracker, use something like
68+
# the _publish above, with a for loop for each tracker
69+
routing_key=routing_key,
70+
exchange=exchange,
71+
declare=[exchange],
72+
)
2773

2874

2975
if __name__ == '__main__':
3076

3177
from kombu import Connection
32-
keywords = '["Soumission", "Rip it Up and Start Again", "Black Dust"]'
33-
connection = Connection("pyamqp://guest:guest@localhost:5672//")
34-
send_as_task(connection, keywords=keywords, kwargs={})
78+
from kombu.utils.debug import setup_logging
79+
80+
# setup root logger
81+
setup_logging(loglevel='INFO', loggers=[''])
82+
83+
keyword_1 = '["Soumission"]'
84+
keyword_2 = '["Rip it Up and Start Again"]'
85+
keywords = '["Black Dust", "When We Collided"]'
86+
87+
with Connection("pyamqp://guest:guest@localhost:5672//") as conn:
88+
send_as_task(conn, keywords=keyword_1, routing_key='books.toscrape.com',
89+
exchange= tasks.task_exchange, kwargs={})
90+
logger.info(f'sent task {keyword_1}')
91+
send_as_task(conn, keywords=keyword_2, routing_key='books.toscrape.com',
92+
exchange= tasks.task_exchange, kwargs={})
93+
logger.info(f'sent task {keyword_2}')
94+
send_as_task(conn, keywords=keywords, routing_key='books.toscrape.com',
95+
exchange= tasks.task_exchange, kwargs={})
96+
logger.info(f'sent task {keywords}')

examples/books_to_scrape/schedulers/brokers/worker_main.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44
~~~~~~~~~~~~
55
Entry point to run the books_to_scrape example.
66
7+
To run this example, first run:
8+
9+
>>> python client_main.py
10+
11+
This will start the producer and send the tasks to the broker Exchange queue.
12+
Then, in a separate command prompt, run:
13+
14+
>>> python client_worker.py
15+
16+
The result should be the worker will process the `keywords` tasks.
17+
718
Note:
819
920
The primary use case where the current Transistor design shines, is when you need to
@@ -15,9 +26,10 @@
1526
of workers to the search page. Each worker has one task issued, a task to execute the
1627
search for the term it has been assigned, and return to us with the response.
1728
18-
The example highlighted here, employs a "crawl" mechanism inside each
19-
of the scraper objects. This is not really showcasing the optimal use case for
20-
a Transistor SplashScraper with Manager/WorkGroups, per the current design.
29+
The example highlighted here in `books_to_scrape`, employs a "crawl" mechanism
30+
inside each of the scraper objects. This is not really showcasing the optimal
31+
use case for a Transistor SplashScraper with Manager/WorkGroups, per the
32+
current design.
2133
2234
The reason is, in this example, we send out 20 workers at once. Each worker
2335
crawls through EACH PAGE on the books.toscrape.com website, until the worker finds
@@ -58,10 +70,11 @@
5870
from examples.books_to_scrape.manager import BooksWorkGroupManager
5971
from examples.books_to_scrape.persistence.serialization import (
6072
BookItems, BookItemsLoader)
73+
from transistor.utility.logging import logger
6174

6275

63-
# 1) Create a FanoutTask instance and connection object to prepare to use
64-
# RabbitMQ message broker.
76+
# 1) Create an ExchangeQueue instance and connection object to prepare
77+
# to use RabbitMQ message broker.
6578
# Set a list of tracker names, with one tracker name for each WorkGroup you create
6679
# in step three. Ensure the tracker name matches the WorkGroup.name in step four.
6780

@@ -98,17 +111,17 @@
98111
items=BookItems,
99112
loader=BookItemsLoader,
100113
exporters=exporters,
101-
workers=3, # this creates 3 scrapers and assigns each a book as a task
102-
kwargs={'timeout': (3.0, 20.0), 'qtimeout': 10})
114+
workers=2, # this creates x scrapers and assigns each a book as a task
115+
kwargs={'timeout': (3.0, 20.0)})
103116
]
104117

105118
# 4) Last, setup the Manager. You can constrain the number of workers actually
106119
# deployed, through the `pool` parameter. For example, this is useful
107120
# when using a Crawlera 'C10' plan which limits concurrency to 10. To deploy all
108121
# the workers concurrently, set the pool to be marginally larger than the number
109122
# of total workers assigned in groups in step #3 above.
110-
manager = BooksWorkGroupManager('books_scrape', tasks, workgroups=groups, pool=5,
111-
connection=connection, should_stop=True)
123+
manager = BooksWorkGroupManager('books_scrape', tasks, workgroups=groups, pool=10,
124+
connection=connection)
112125

113126
if __name__ == "__main__":
114127

@@ -117,13 +130,15 @@
117130
setup_logging(loglevel='INFO', loggers=[''])
118131
with Connection('amqp://guest:guest@localhost:5672//') as conn:
119132
try:
133+
120134
manager.main() # call manager.main() to start the job.
121135
except KeyboardInterrupt:
122136
print('bye bye')
123137
# below shows an example of navigating your persisted data after the scrape
124138

125139
result = get_job_results(ndb, 'books_scrape')
126-
print(f'Printing: books_scrape result')
127-
for r in result:
128-
print(f"{r['book_title']}, {r['price']}, {r['stock']}")
129-
delete_job(ndb, 'books_scrape')
140+
logger.info(f'Printing: books_scrape result =>')
141+
if result:
142+
for r in result:
143+
logger.info(f"{r['book_title']}, {r['price']}, {r['stock']}")
144+
delete_job(ndb, 'books_scrape')

examples/books_to_scrape/schedulers/stateful_book/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ def get_file_path(filename):
116116
# 5) Last, setup the Manager. You can constrain the number of workers actually
117117
# deployed, through the `pool` parameter. For example, this is useful
118118
# when using a Crawlera 'C10' plan which limits concurrency to 10. To deploy all
119-
# the workers concurrently, set the pool to be marginally larger than the number
120-
# of total workers assigned in groups in step #3 above.
119+
# the workers concurrently, set the pool +1 higher than the number of total
120+
# workers assigned in groups in step #3 above. The +1 is for pool manager.
121121
manager = BooksWorkGroupManager('books_scrape', tasks, workgroups=groups, pool=5)
122122

123123

examples/books_to_scrape/workgroup.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from transistor import BaseWorker
1313
from examples.books_to_scrape.persistence import ndb
1414
from transistor.persistence.newt_db.collections import SpiderList
15-
15+
from transistor.utility.logging import logger
1616

1717
class BooksWorker(BaseWorker):
1818
"""
@@ -44,8 +44,8 @@ def pre_process_exports(self, spider, task):
4444
try:
4545
# create the list with the job name if it doesnt already exist
4646
ndb.root.spiders.add(self.job_id, SpiderList())
47-
print(f'Worker {self.name}-{self.number} created a new scrape_list for '
48-
f'{self.job_id}')
47+
logger.info(f'Worker {self.name}-{self.number} created a new spider '
48+
f'list for {self.job_id}')
4949
except KeyError:
5050
# will be raised if there is already a list with the same job_name
5151
pass
@@ -54,11 +54,11 @@ def pre_process_exports(self, spider, task):
5454
# save the items object to newt.db
5555
ndb.root.spiders[self.job_id].add(items)
5656
ndb.commit()
57-
print(f'Worker {self.name}-{self.number} saved {items.__repr__()} to '
57+
logger.info(f'Worker {self.name}-{self.number} saved {items.__repr__()} to '
5858
f'scrape_list "{self.job_id}" for task {task}.')
5959
else:
6060
# if job_id is NONE then we'll skip saving the objects
61-
print(f'Worker {self.name}-{self.number} said job_name is {self.job_id} '
61+
logger.info(f'Worker {self.name}-{self.number} said job_name is {self.job_id} '
6262
f'so will not save it.')
6363

6464
def post_process_exports(self, spider, task):
@@ -70,6 +70,6 @@ class attribute called `events`.
7070
7171
"""
7272
self.events.append(spider)
73-
print(f'{self.name} has {spider.stock} inventory status.')
74-
print(f'pricing: {spider.price}')
75-
print(f'Worker {self.name}-{self.number} finished task {task}')
73+
logger.info(f'{self.name} has {spider.stock} inventory status.')
74+
logger.info(f'pricing: {spider.price}')
75+
logger.info(f'Worker {self.name}-{self.number} finished task {task}')

0 commit comments

Comments
 (0)