Skip to content

Commit 9bd3fe1

Browse files
committed
[PERF] bus: do not build registry to check session
When there are more databases than what the LRU cache can hold, the gevent worker struggles to keep notifications flowing. Each outgoing message requires a registry, which is highly inefficient. This commit is a first step towards removing the need for registry when dispatching outgoing message. From now on, checking the session won't require a registry. The result of `_get_session_token_query_params` is cached, allowing to compute the session token with a direct query. Due to this change, dispatching notifications won't upgrade the `res.device` anymore which is considered acceptable. Part-of: odoo#235746 Signed-off-by: Julien Castiaux (juc) <[email protected]>
1 parent 8066998 commit 9bd3fe1

File tree

4 files changed

+122
-25
lines changed

4 files changed

+122
-25
lines changed

addons/bus/session_helpers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import hashlib
2+
import hmac
3+
import time
4+
5+
from odoo.api import Environment
6+
from odoo.modules.registry import Registry
7+
from odoo.sql_db import SQL
8+
from odoo.tools.lru import LRU
9+
from odoo.tools.misc import consteq
10+
11+
_query_params_by_user = LRU(8192)
12+
13+
14+
def _get_session_token_query_params(cr, session):
15+
"""
16+
Retrieve the session token query parameters like
17+
`res.users@_get_session_token_query_params`, but with caching to avoid building the
18+
full registry. The cache is invalidated when `registry.registry_sequence` has changed.
19+
"""
20+
cache_key = (cr.dbname, session.uid)
21+
if cached_value := _query_params_by_user.get(cache_key):
22+
cr.execute('SELECT MAX(id) FROM orm_signaling_registry')
23+
if cached_value['registry_sequence'] == cr.fetchone()[0]:
24+
return cached_value['query_params']
25+
Registry(cr.dbname).check_signaling()
26+
env = new_env(cr, session)
27+
params = env.user._get_session_token_query_params()
28+
_query_params_by_user[cache_key] = {
29+
'registry_sequence': env.registry.registry_sequence,
30+
'query_params': params,
31+
}
32+
return params
33+
34+
35+
def check_session(cr, session):
36+
session._delete_old_sessions()
37+
if 'deletion_time' in session and session['deletion_time'] <= time.time():
38+
return False
39+
query_params = _get_session_token_query_params(cr, session)
40+
cr.execute(
41+
SQL(
42+
'SELECT %(select)s FROM %(from)s %(joins)s WHERE %(where)s GROUP BY %(group_by)s',
43+
**query_params,
44+
),
45+
)
46+
if cr.rowcount != 1:
47+
return False
48+
row = cr.fetchone()
49+
key_tuple = tuple(
50+
(col.name, row[i]) for i, col in enumerate(cr.description) if row[i] is not None
51+
)
52+
key = str(key_tuple).encode()
53+
token = hmac.new(key, session.sid.encode(), hashlib.sha256).hexdigest()
54+
return consteq(token, session.session_token)
55+
56+
57+
def new_env(cr, session, *, set_lang=False):
58+
"""
59+
Create a new environment. Make sure the transaction has a `default_env` and
60+
if requested, set the language of the user in the context.
61+
"""
62+
uid = session.uid
63+
ctx = dict(session.context, lang=None) # lang is not guaranteed to be correct
64+
env = Environment(cr, uid, ctx)
65+
if set_lang:
66+
lang = env['res.lang']._get_code(ctx['lang'])
67+
env = env(context=dict(ctx, lang=lang))
68+
if not env.transaction.default_env:
69+
env.transaction.default_env = env
70+
return env

addons/bus/tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from . import test_ir_websocket
77
from . import test_notify
88
from . import test_websocket_caryall
9+
from . import test_websocket_check_session
910
from . import test_close_websocket_after_tour
1011
from . import test_websocket_controller
1112
from . import test_websocket_rate_limiting
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import time
2+
from datetime import timedelta
3+
from unittest.mock import patch
4+
5+
from freezegun import freeze_time
6+
7+
from odoo.tests import HttpCase, new_test_user
8+
9+
from ..session_helpers import _get_session_token_query_params, check_session
10+
11+
12+
class TestWebsocketCheckSession(HttpCase):
13+
def test_check_session_deletion_time(self):
14+
bob = new_test_user(self.env, "bob", groups="base.group_user")
15+
self.authenticate(bob.login, bob.password)
16+
with freeze_time() as frozen_time:
17+
self.session["deletion_time"] = time.time() + 3600
18+
self.assertTrue(check_session(self.env.cr, self.session))
19+
frozen_time.tick(delta=timedelta(hours=2))
20+
self.assertFalse(check_session(self.env.cr, self.session))
21+
22+
def test_check_session_token_field_changes(self):
23+
bob = new_test_user(self.env, "bob", groups="base.group_user")
24+
self.authenticate(bob.login, bob.password)
25+
self.assertTrue(check_session(self.env.cr, self.session))
26+
bob.password = "bob_new_password"
27+
self.assertFalse(check_session(self.env.cr, self.session))
28+
29+
def test_update_cache_when_registry_changes(self):
30+
bob = new_test_user(self.env, "bob", groups="base.group_user")
31+
self.authenticate(bob.login, bob.password)
32+
bob_query_params = _get_session_token_query_params(self.env.cr, self.session)
33+
self.assertIs(
34+
bob_query_params, _get_session_token_query_params(self.env.cr, self.session)
35+
)
36+
jane = new_test_user(self.env, "jane", groups="base.group_user")
37+
self.authenticate(jane.login, jane.password)
38+
current_registry_sequence = self.env.registry.registry_sequence
39+
# Signaling is patched during test, simulate first entry coming from an old registry.
40+
with patch.object(self.env.registry, "registry_sequence", current_registry_sequence - 1):
41+
jane_query_params = _get_session_token_query_params(self.env.cr, self.session)
42+
next_jane_query_params = _get_session_token_query_params(self.env.cr, self.session)
43+
self.assertIsNot(jane_query_params, next_jane_query_params)
44+
self.assertIs(next_jane_query_params, _get_session_token_query_params(self.env.cr, self.session))

addons/bus/websocket.py

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
from werkzeug.exceptions import BadRequest, HTTPException, ServiceUnavailable
2727

2828
import odoo
29-
from odoo import api, modules
29+
from odoo import modules
3030
from .models.bus import dispatch
31+
from .session_helpers import check_session, new_env
3132
from odoo.http import root, Request, Response, SessionExpiredException, get_default_session
3233
from odoo.modules.registry import Registry
3334
from odoo.service import model as service_model
3435
from odoo.service.server import CommonServer
35-
from odoo.service.security import check_session
3636
from odoo.tools import config
3737

3838
_logger = logging.getLogger(__name__)
@@ -620,7 +620,7 @@ def _terminate(self):
620620
dispatch.unsubscribe(self)
621621
self._trigger_lifecycle_event(LifecycleEvent.CLOSE)
622622
with acquire_cursor(self._db) as cr:
623-
env = self.new_env(cr, self._session)
623+
env = new_env(cr, self._session)
624624
env["ir.websocket"]._on_websocket_closed(self._cookies)
625625

626626
def _handle_control_frame(self, frame):
@@ -697,7 +697,7 @@ def _trigger_lifecycle_event(self, event_type):
697697
if not self.__event_callbacks[event_type]:
698698
return
699699
with closing(acquire_cursor(self._db)) as cr:
700-
env = self.new_env(cr, self._session, set_lang=True)
700+
env = new_env(cr, self._session, set_lang=True)
701701
for callback in self.__event_callbacks[event_type]:
702702
try:
703703
service_model.retrying(functools.partial(callback, env, self), env)
@@ -726,8 +726,7 @@ def _assert_session_validity(self):
726726
if session.uid is None:
727727
return
728728
with acquire_cursor(session.db) as cr:
729-
env = self.new_env(cr, session)
730-
if not check_session(session, env):
729+
if not check_session(cr, session):
731730
raise SessionExpiredException()
732731

733732
def _send_control_command(self, command, data=None):
@@ -754,7 +753,7 @@ def _process_control_command(self, command, data):
754753
def _dispatch_bus_notifications(self):
755754
self._waiting_for_dispatch = False
756755
with acquire_cursor(self._session.db) as cr:
757-
env = self.new_env(cr, self._session)
756+
env = new_env(cr, self._session)
758757
notifications = env['bus.bus']._poll(
759758
self._channels, self._last_notif_sent_id, [n[0] for n in self._notif_history]
760759
)
@@ -784,23 +783,6 @@ def _dispatch_bus_notifications(self):
784783
self._notif_history = self._notif_history[last_index + 1 :]
785784
self._send(notifications)
786785

787-
def new_env(self, cr, session, *, set_lang=False):
788-
"""
789-
Create a new environment.
790-
Make sure the transaction has a `default_env` and if requested, set the
791-
language of the user in the context.
792-
"""
793-
uid = session.uid
794-
# lang is not guaranteed to be correct, set None
795-
ctx = dict(session.context, lang=None)
796-
env = api.Environment(cr, uid, ctx)
797-
if set_lang:
798-
lang = env['res.lang']._get_code(ctx['lang'])
799-
env = env(context=dict(ctx, lang=lang))
800-
if not env.transaction.default_env:
801-
env.transaction.default_env = env
802-
return env
803-
804786

805787
class TimeoutManager:
806788
"""
@@ -913,7 +895,7 @@ def serve_websocket_message(self, message):
913895
raise InvalidDatabaseException() from exc
914896

915897
with closing(acquire_cursor(self.db)) as cr:
916-
self.env = self.ws.new_env(cr, self.session, set_lang=True)
898+
self.env = new_env(cr, self.session, set_lang=True)
917899
service_model.retrying(
918900
functools.partial(self._serve_ir_websocket, event_name, data),
919901
self.env,

0 commit comments

Comments
 (0)