Skip to content

Commit 5966096

Browse files
author
Guewen Baconnier
committed
[CHG] multiprocessing: The Jobs workers are affected by the database being closed when they
run in the Cron Worker process. Thus, when OpenERP run in multiprocess, the connector does not start job workers threads. Instead, the new script ``openerp-connector-worker`` should be used. It spawns processes which start the job workers threads themselves. Example of usage: $ PYTHONPATH=/path/to/server /path/to/connector/openerp-connector-worker \ --config /path/to/configfile \ --workers=2 --logfile=/path/to/logfile This is not ideal as soon as we have to ensure that OpenERP AND the script are running. However: it still works normally when OpenERP is not using multiprocessing and this change allow more control on the worker processes (to implement PG's NOTIFY for instance). More details in the nested history of the revision.
2 parents 0961df4 + ac316d0 commit 5966096

File tree

6 files changed

+236
-62
lines changed

6 files changed

+236
-62
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
.. _multiprocessing:
2+
3+
4+
######################################
5+
Use the connector with multiprocessing
6+
######################################
7+
8+
When OpenERP is launched with 1 process, the jobs worker will run
9+
threaded in the same process.
10+
11+
When OpenERP is launched with multiple processes using the option
12+
``--workers``, the jobs workers are not independant processes, however,
13+
you have to launch them separately with the script
14+
``openerp-connector-worker`` located in the connector module.
15+
16+
It takes the same arguments and configuration file than the OpenERP
17+
server.
18+
19+
.. important:: The Python path must contain the path to the OpenERP
20+
server when ``openerp-connector-worker`` is launched.
21+
22+
Example::
23+
24+
$ PYTHONPATH=/path/to/server connector/openerp-connector-worker --config /path/to/configfile \
25+
--workers=2 --logfile=/path/to/logfile
26+
27+
The 'Enqueue Jobs' scheduled action is useless when multiprocessing is
28+
used.
29+
30+
.. note:: The ``openerp-connector-worker`` should not be launched
31+
alongside OpenERP when the latter does not run in multiprocess
32+
mode, because the interprocess signaling would not be done.

connector/doc/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Developer's guide
3131

3232
guides/overview.rst
3333
guides/bootstrap_connector.rst
34+
guides/multiprocessing.rst
3435

3536
API Reference
3637
=============

connector/openerp-connector-worker

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!/usr/bin/env python
2+
import sys
3+
import logging
4+
import os
5+
import signal
6+
import time
7+
import threading
8+
from contextlib import closing
9+
10+
import openerp
11+
from openerp.cli import server as servercli
12+
import openerp.service.workers as workers
13+
from openerp.modules.registry import RegistryManager
14+
from openerp.tools import config
15+
16+
_logger = logging.getLogger('openerp-connector-worker')
17+
18+
19+
class Multicornnector(workers.Multicorn):
20+
21+
def __init__(self, app):
22+
super(Multicornnector, self).__init__(app)
23+
self.address = ('0.0.0.0', 0)
24+
self.population = config['workers'] or 1
25+
self.workers_connector = {}
26+
27+
def process_spawn(self):
28+
while len(self.workers_connector) < self.population:
29+
self.worker_spawn(WorkerConnector, self.workers_connector)
30+
31+
def worker_pop(self, pid):
32+
if pid in self.workers:
33+
_logger.debug("Worker (%s) unregistered", pid)
34+
try:
35+
self.workers_connector.pop(pid, None)
36+
u = self.workers.pop(pid)
37+
u.close()
38+
except OSError:
39+
return
40+
41+
42+
class WorkerConnector(workers.Worker):
43+
""" HTTP Request workers """
44+
45+
def __init__(self, multi):
46+
super(WorkerConnector, self).__init__(multi)
47+
self.db_index = 0
48+
49+
def process_work(self):
50+
if config['db_name']:
51+
db_names = config['db_name'].split(',')
52+
else:
53+
services = openerp.netsvc.ExportService._services
54+
if services.get('db'):
55+
db_names = services['db'].exp_list(True)
56+
else:
57+
db_names = []
58+
if len(db_names):
59+
self.db_index = (self.db_index + 1) % len(db_names)
60+
db_name = db_names[self.db_index]
61+
self.setproctitle(db_name)
62+
db = openerp.sql_db.db_connect(db_name)
63+
threading.current_thread().dbname = db_name
64+
with closing(db.cursor()) as cr:
65+
cr.execute("SELECT 1 FROM ir_module_module "
66+
"WHERE name = %s "
67+
"AND state = %s", ('connector', 'installed'))
68+
if cr.fetchone():
69+
RegistryManager.check_registry_signaling(db_name)
70+
registry = openerp.pooler.get_pool(db_name)
71+
if registry:
72+
queue_worker = registry['queue.worker']
73+
queue_worker.assign_then_enqueue(cr,
74+
openerp.SUPERUSER_ID,
75+
max_jobs=50)
76+
RegistryManager.signal_caches_change(db_name)
77+
else:
78+
self.db_index = 0
79+
80+
def sleep(self):
81+
# Really sleep once all the databases have been processed.
82+
if self.db_index == 0:
83+
interval = 15 + self.pid % self.multi.population # chorus effect
84+
time.sleep(interval)
85+
86+
def start(self):
87+
workers.Worker.start(self)
88+
openerp.service.start_internal()
89+
90+
91+
if __name__ == "__main__":
92+
args = sys.argv[1:]
93+
servercli.check_root_user()
94+
config.parse_config(args)
95+
96+
servercli.check_postgres_user()
97+
openerp.netsvc.init_logger()
98+
servercli.report_configuration()
99+
100+
openerp.multi_process = True
101+
openerp.worker_connector = True
102+
Multicornnector(openerp.service.wsgi_server.application).run()

connector/queue/model.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from .job import STATES, DONE, PENDING, OpenERPJobStorage
3131
from .worker import WORKER_TIMEOUT
3232
from ..session import ConnectorSession
33+
from .worker import watcher
3334

3435
_logger = logging.getLogger(__name__)
3536

@@ -180,7 +181,6 @@ class QueueWorker(orm.Model):
180181
_rec_name = 'uuid'
181182

182183
worker_timeout = WORKER_TIMEOUT
183-
_worker = None
184184

185185
_columns = {
186186
'uuid': fields.char('UUID', readonly=True, select=True, required=True),
@@ -189,7 +189,7 @@ class QueueWorker(orm.Model):
189189
'date_alive': fields.datetime('Last Alive Check', readonly=True),
190190
'job_ids': fields.one2many('queue.job', 'worker_id',
191191
string='Jobs', readonly=True),
192-
}
192+
}
193193

194194
def _notify_alive(self, cr, uid, worker, context=None):
195195
worker_ids = self.search(cr, uid,
@@ -204,7 +204,6 @@ def _notify_alive(self, cr, uid, worker, context=None):
204204
'date_start': now_fmt,
205205
'date_alive': now_fmt},
206206
context=context)
207-
self._worker = worker
208207
else:
209208
self.write(cr, uid, worker_ids,
210209
{'date_alive': now_fmt}, context=context)
@@ -218,23 +217,16 @@ def _purge_dead_workers(self, cr, uid, context=None):
218217
dead_workers = self.read(cr, uid, dead_ids, ['uuid'], context=context)
219218
for worker in dead_workers:
220219
_logger.debug('Worker %s is dead', worker['uuid'])
221-
# exists in self._workers only for the same process and pool
222-
if worker['uuid'] == self._worker:
223-
_logger.error('Worker %s should be alive, '
224-
'but appears to be dead.',
225-
worker['uuid'])
226-
self._worker = None
227220
try:
228221
self.unlink(cr, uid, dead_ids, context=context)
229222
except Exception:
230223
_logger.debug("Failed attempt to unlink a dead worker, likely due "
231-
"to another transaction in progress. "
232-
"Trace of the failed unlink "
233-
"%s attempt: ", self._worker.uuid, exc_info=True)
224+
"to another transaction in progress.")
234225

235226
def _worker_id(self, cr, uid, context=None):
236-
assert self._worker
237-
worker_ids = self.search(cr, uid, [('uuid', '=', self._worker.uuid)],
227+
worker = watcher.worker_for_db(cr.dbname)
228+
assert worker
229+
worker_ids = self.search(cr, uid, [('uuid', '=', worker.uuid)],
238230
context=context)
239231
assert len(worker_ids) == 1, ("%s worker found in database instead "
240232
"of 1" % len(worker_ids))
@@ -268,7 +260,8 @@ def assign_jobs(self, cr, uid, max_jobs=None, context=None):
268260
:param max_jobs: maximal limit of jobs to assign on a worker
269261
:type max_jobs: int
270262
"""
271-
if self._worker:
263+
worker = watcher.worker_for_db(cr.dbname)
264+
if worker:
272265
self._assign_jobs(cr, uid, max_jobs=max_jobs, context=context)
273266
else:
274267
_logger.debug('No worker started for process %s', os.getpid())
@@ -278,7 +271,8 @@ def enqueue_jobs(self, cr, uid, context=None):
278271
""" Enqueue all the jobs assigned to the worker of the current
279272
process
280273
"""
281-
if self._worker:
274+
worker = watcher.worker_for_db(cr.dbname)
275+
if worker:
282276
self._enqueue_jobs(cr, uid, context=context)
283277
else:
284278
_logger.debug('No worker started for process %s', os.getpid())
@@ -295,6 +289,7 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None):
295289
# use a SAVEPOINT to be able to rollback this part of the
296290
# transaction without failing the whole transaction if the LOCK
297291
# cannot be acquired
292+
worker = watcher.worker_for_db(cr.dbname)
298293
cr.execute("SAVEPOINT queue_assign_jobs")
299294
try:
300295
cr.execute(sql, log_exceptions=False)
@@ -306,34 +301,39 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None):
306301
_logger.debug("Failed attempt to assign jobs, likely due to "
307302
"another transaction in progress. "
308303
"Trace of the failed assignment of jobs on worker "
309-
"%s attempt: ", self._worker.uuid, exc_info=True)
304+
"%s attempt: ", worker.uuid, exc_info=True)
310305
return
311306
job_rows = cr.fetchall()
312307
if not job_rows:
313-
_logger.debug('No job to assign to worker %s', self._worker.uuid)
308+
_logger.debug('No job to assign to worker %s', worker.uuid)
314309
return
315310
job_ids = [id for id, in job_rows]
316311

317312
worker_id = self._worker_id(cr, uid, context=context)
318313
_logger.debug('Assign %d jobs to worker %s', len(job_ids),
319-
self._worker.uuid)
314+
worker.uuid)
320315
# ready to be enqueued in the worker
321316
try:
322317
self.pool.get('queue.job').write(cr, uid, job_ids,
323-
{'state': 'pending',
324-
'worker_id': worker_id},
325-
context=context)
318+
{'state': 'pending',
319+
'worker_id': worker_id},
320+
context=context)
326321
except Exception:
327322
pass # will be assigned to another worker
328323

329324
def _enqueue_jobs(self, cr, uid, context=None):
330-
""" Called by an ir.cron, add to the queue all the jobs not
331-
already queued"""
325+
""" Add to the queue of the worker all the jobs not
326+
yet queued but already assigned."""
327+
job_obj = self.pool.get('queue.job')
332328
db_worker_id = self._worker_id(cr, uid, context=context)
333-
db_worker = self.browse(cr, uid, db_worker_id, context=context)
334-
for job in db_worker.job_ids:
335-
if job.state == 'pending':
336-
self._worker.enqueue_job_uuid(job.uuid)
329+
job_ids = job_obj.search(cr, uid,
330+
[('worker_id', '=', db_worker_id),
331+
('state', '=', 'pending')],
332+
context=context)
333+
worker = watcher.worker_for_db(cr.dbname)
334+
jobs = job_obj.read(cr, uid, job_ids, ['uuid'], context=context)
335+
for job in jobs:
336+
worker.enqueue_job_uuid(job['uuid'])
337337

338338

339339
class requeue_job(orm.TransientModel):

0 commit comments

Comments
 (0)