Skip to content

Commit

Permalink
[IMP] refactor
Browse files Browse the repository at this point in the history
Move common methods in main task set
  • Loading branch information
nseinlet committed Aug 23, 2024
1 parent 4cf181b commit 2505dbb
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 114 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from setuptools import setup, find_packages

setup(name='OdooLocust',
version='1.6.5',
version='1.6.6',
description='Easily load test Odoo using Locust and odoolib.',
author='Nicolas Seinlet',
author_email='',
Expand Down
76 changes: 42 additions & 34 deletions src/OdooLocust/OdooTaskSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@


class OdooTaskSet(TaskSet):
model_name = False
model = False
form_fields = []
list_fields = []
kanban_fields = []
filters = []

def _get_user_context(self):
res = self.client.get_model('res.users').read(self.client.user_id, ['lang', 'tz'])
return {
Expand All @@ -40,25 +47,22 @@ def _get_user_context(self):
}

def _fields_view_get(self, model, view_mode):
res = self.client.get_model(model).load_views(views=[(False, vm) for vm in list(set(["list", "form", "search", view_mode]))])
res = self.client.get_model(model).get_views(views=[(False, vm) for vm in list(set(["list", "form", "search", view_mode]))])
return [n for n in res.get('fields_views', {}).get(view_mode, {}).get('fields', {}).keys()]

def _filters_view_get(self, model):
res = self.client.get_model(model).load_views(views=[(False, vm) for vm in list(set(["list", "form", "search"]))])
res = self.client.get_model(model).get_views(views=[(False, vm) for vm in list(set(["list", "form", "search"]))])
return [n['domain'] for n in res.get('filters', {})]

def _parse_children_menu(self, childs):
res = []
for child in childs:
if child['action']:
res.append(child['action'].split(","))
if child['children']:
res += self._parse_children_menu(child['children'])
return res

def _load_menu(self):
menus = []
res = self.client.get_model('ir.ui.menu').load_menus(False, context=self._get_user_context())
return self._parse_children_menu(res['children'])
for menu_id in res.keys():
menu = res[menu_id].get('action')
if menu:
menus.append(menu.split(","))
print(menus)
return menus

def _action_load(self, action_id, action_type=None):
if not action_type:
Expand All @@ -69,54 +73,59 @@ def _action_load(self, action_id, action_type=None):
def _check_fields(self, model, fields_list):
all_fields = self.client.get_model(model).fields_get()
return [ f for f in fields_list if f in all_fields.keys() ]

def _load_fields_lists(self, form=True, list=True, kanban=False, filters=True):
if form:
self.form_fields = self._fields_view_get(self.model_name, "form")
if list:
self.list_fields = self._fields_view_get(self.model_name, "list")
if kanban:
self.kanban_fields = self._fields_view_get(self.model_name, "kanban")
if filters:
self.filters = self._filters_view_get(self.model_name)


class OdooGenericTaskSet(OdooTaskSet):

def on_start(self):
self.menu = self._load_menu()
self.randomlyChooseMenu()

@task(1)
def randomlyChooseMenu(self):
self.model_name = False
self.model = False
while not self.model and self.model != "False":
while not self.model_name and self.model_name != "False":
item = random.choice(self.menu)
self.last_action = self._action_load(int(item[1]), item[0])
self.model = self.last_action.get('res_model', False)
self.form_fields = self._fields_view_get(self.model, "form")
self.list_fields = self._fields_view_get(self.model, "list")
if "kanban" in self.last_action.get('view_mode', []):
self.kanban_fields = self._fields_view_get(self.model, "kanban")
self.filters = self._filters_view_get(self.model)
self.model_name = self.last_action.get('res_model', False)
self.model = self.client.get_model(self.model_name)
self._load_fields_lists(kanban="kanban" in self.last_action.get('view_mode', []))

@task(30)
def form_view(self):
domain = []
if self.filters and random.randint(0, 10) < 3:
domain = random.choice(self.filters)
context = self.client.get_user_context()
model = self.client.get_model(self.model)
nbr_records = model.search_count(domain or [], context=context)
nbr_records = self.model.search_count(domain or [], context=context)
offset = random.randint(0, nbr_records % 80) if nbr_records > 80 else 0
ids = model.search(domain or [], limit=80, offset=offset, context=context)
ids = self.model.search(domain or [], limit=80, offset=offset, context=context)
if ids:
model.read(random.choice(ids), self.form_fields, context=context)
self.model.read(random.choice(ids), self.form_fields, context=context)

@task(10)
def list_view(self):
domain = []
if self.filters and random.randint(0, 10) < 3:
domain = random.choice(self.filters)
context = self.client.get_user_context()
model = self.client.get_model(self.model)
nbr_records = model.search_count(domain or [], context=context)
ids = model.search(domain or [], limit=80)
nbr_records = self.model.search_count(domain or [], context=context)
ids = self.model.search(domain or [], limit=80)
if nbr_records > 80:
offset = random.randint(0, nbr_records % 80)
ids = model.search(domain or [], limit=80, offset=offset, context=context)
ids = self.model.search(domain or [], limit=80, offset=offset, context=context)
if ids:
model.search_read([('id', 'in', ids)], self.list_fields, context=context)
self.model.search_read([('id', 'in', ids)], self.list_fields, context=context)

@task(10)
def kanban_view(self):
Expand All @@ -125,11 +134,10 @@ def kanban_view(self):
if self.filters and random.randint(0, 10) < 3:
domain = random.choice(self.filters)
context = self.client.get_user_context()
model = self.client.get_model(self.model)
nbr_records = model.search_count(domain or [], context=context)
ids = model.search(domain or [], limit=80)
nbr_records = self.model.search_count(domain or [], context=context)
ids = self.model.search(domain or [], limit=80)
if nbr_records > 80:
offset = random.randint(0, nbr_records % 80)
ids = model.search(domain or [], limit=80, offset=offset, context=context)
ids = self.model.search(domain or [], limit=80, offset=offset, context=context)
if ids:
model.search_read([('id', 'in', ids)], self.kanban_fields, context=context)
self.model.search_read([('id', 'in', ids)], self.kanban_fields, context=context)
83 changes: 35 additions & 48 deletions src/OdooLocust/crm/lead.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,89 +35,76 @@
import random

class CrmLead(OdooTaskSet):
list_fields = ["stage_id", "active", "company_id", "calendar_event_count", "company_id", "user_company_ids", "date_deadline", "create_date", "name", "contact_name", "partner_name", "email_from", "phone", "city", "state_id", "country_id", "partner_id", "user_id", "team_id", "active", "campaign_id", "referred", "medium_id", "probability", "source_id", "message_needaction", "tag_ids", "priority"]
form_fields = ["stage_id", "active", "company_id", "calendar_event_count", "sale_amount_total", "sale_order_count", "visitor_page_count", "duplicate_lead_count", "name", "company_currency", "expected_revenue", "recurring_revenue", "recurring_plan", "automated_probability", "is_automated_probability", "probability", "is_partner_visible", "partner_id", "partner_name", "street", "street2", "city", "state_id", "zip", "country_id", "website", "lang_active_count", "lang_code", "lang_id", "is_blacklisted", "partner_is_blacklisted", "phone_blacklisted", "mobile_blacklisted", "email_state", "phone_state", "partner_email_update", "partner_phone_update", "email_from", "phone", "lost_reason_id", "date_conversion", "user_company_ids", "contact_name", "title", "function", "mobile", "type", "user_id", "date_deadline", "priority", "tag_ids", "team_id", "lead_properties", "description", "campaign_id", "medium_id", "source_id", "referred", "date_open", "date_closed", "day_open", "day_close", "display_name"]
random_id = -1
_model_id = -1

@property
def model_id(self):
if self._model_id < 0:
irm = self.client.get_model('ir.model.data')
irm_id = irm.search_read([('model', '=', 'crm'), ('name', '=', 'model_crm_lead')], ['res_id'])
if irm_id:
self._model_id = irm_id[0]['res_id']
return self._model_id
model_name = 'crm.lead'
model = False

def on_start(self):
super().on_start()
self.model = self.client.get_model(self.model_name)
self._load_fields_lists()
self.test_searchread()


def _get_search_domain(self):
if self.filters and random.randint(0, 10) < 3:
return random.choice(self.filters)
name = names.get_first_name()
return ['|', '|', '|', ('partner_name', 'ilike', name), ('email_from', 'ilike', name), ('contact_name', 'ilike', name), ('name', 'ilike', name)]

@task(10)
def test_searchread(self):
lead_model = self.client.get_model('crm.lead')
res = lead_model.search_read(
self._get_search_domain(),
self.list_fields,
limit=80,
context=self.client.get_user_context()
)
if res:
self.random_id = res[0]['id']
search_domains = [
self._get_search_domain(),
[('id', '>', self.random_id)],
[]
]
for domain in search_domains:
res = self.model.search_read(
domain,
self.list_fields,
limit=80,
context=self.client.get_user_context()
)
if res:
self.random_id = random.choice(res)['id']
return True

@task(5)
def test_websearchread(self):
lead_model = self.client.get_model('crm.lead')
res = lead_model.web_search_read(
self._get_search_domain(),
self.list_fields,
res = self.model.web_search_read(
specification={f: {} for f in self.list_fields},
domain=self._get_search_domain(),
limit=80,
context=self.client.get_user_context()
)
if res:
if res and res['records']:
self.random_id = res[0]['id']

@task(20)
def test_read(self):
lead_model = self.client.get_model('crm.lead')
lead_model.read(self.random_id, self.form_fields, context=self.client.get_user_context())
self.model.read(self.random_id, self.form_fields, context=self.client.get_user_context())

@task(5)
def lead_stage_change(self):
lead_model = self.client.get_model('crm.lead')
s1 = 229
s2 = 99
res1 = lead_model.search([('id', '>', self.random_id), ('stage_id', '=', s1)], limit=1)
res2 = lead_model.search([('id', '>', self.random_id), ('stage_id', '=', s2)], limit=1)
res1 = self.model.search([('id', '>', self.random_id), ('stage_id', '=', s1)], limit=1)
res2 = self.model.search([('id', '>', self.random_id), ('stage_id', '=', s2)], limit=1)
if res1:
lead_model.write(res1[0], {'stage_id': s2}, context=self.client.get_user_context())
self.model.write(res1[0], {'stage_id': s2}, context=self.client.get_user_context())
if res2:
lead_model.write(res2[0], {'stage_id': s1}, context=self.client.get_user_context())
self.model.write(res2[0], {'stage_id': s1}, context=self.client.get_user_context())

@task(5)
def test_activity(self):
activity_model = self.client.get_model('mail.activity')
res = activity_model.create({
"activity_type_id": 4,
"calendar_event_id": False,
"date_deadline": (date(2012, 1 ,1)+timedelta(days=random.randint(1,3650))).isoformat(),
"note": False,
"res_id": self.random_id,
"res_model_id": self.model_id,
"summary": "Todo",
"user_id": self.client.user_id,
}, context=self.client.get_user_context())
activity_model.action_feedback(res, 'Done')
if self.random_id != -1:
res_id = self.model.activity_schedule(self.random_id, date_deadline=(date(2012, 1 ,1)+timedelta(days=random.randint(1,3650))).isoformat())
activity_model = self.client.get_model('mail.activity')
activity_model.action_done(res_id)

@task
def test_pipeline_analysis(self):
lead_model = self.client.get_model('crm.lead')
lead_model.web_read_group(
self.model.web_read_group(
["&", ["type", "=", "opportunity"], "&", ["create_date", ">=", "2022-01-01 00:00:00"], ["create_date", "<=", "2022-12-31 23:59:59"]],
["__count"],
["stage_id", "date_deadline:month"],
Expand Down
Loading

0 comments on commit 2505dbb

Please sign in to comment.