Skip to content

Commit 5b48af8

Browse files
committed
Disk errors handling
1 parent da15bba commit 5b48af8

File tree

3 files changed

+40
-8
lines changed

3 files changed

+40
-8
lines changed

logstash_async/database.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from logstash_async.constants import constants
1212
from logstash_async.utils import ichunked
1313

14-
1514
DATABASE_SCHEMA_STATEMENTS = [
1615
'''
1716
CREATE TABLE IF NOT EXISTS `event` (
@@ -29,6 +28,10 @@ class DatabaseLockedError(Exception):
2928
pass
3029

3130

31+
class DatabaseDiskIOError(Exception):
32+
pass
33+
34+
3235
class DatabaseCache(Cache):
3336
"""
3437
Backend implementation for python-logstash-async. Keeps messages on disk in a SQL-lite DB
@@ -47,8 +50,8 @@ def __init__(self, path, event_ttl=None):
4750

4851
@contextmanager
4952
def _connect(self):
50-
self._open()
5153
try:
54+
self._open()
5255
with self._connection as connection:
5356
yield connection
5457
except sqlite3.OperationalError:
@@ -96,6 +99,12 @@ def _handle_sqlite_error(self):
9699
_, exc, _ = sys.exc_info()
97100
if str(exc) == 'database is locked':
98101
raise DatabaseLockedError from exc
102+
if str(exc) == 'disk I/O error':
103+
raise DatabaseDiskIOError from exc
104+
if str(exc) == "unable to open database file":
105+
raise DatabaseDiskIOError from exc
106+
if str(exc) == "attempt to write a readonly database":
107+
raise DatabaseDiskIOError from exc
99108

100109
# ----------------------------------------------------------------------
101110
def get_queued_events(self):

logstash_async/worker.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from limits.strategies import FixedWindowRateLimiter
1515

1616
from logstash_async.constants import constants
17-
from logstash_async.database import DatabaseCache, DatabaseLockedError
17+
from logstash_async.database import DatabaseCache, DatabaseLockedError, DatabaseDiskIOError
1818
from logstash_async.memory_cache import MemoryCache
1919
from logstash_async.utils import safe_log_via_print
2020

@@ -128,7 +128,7 @@ def _fetch_events(self):
128128
self._flush_queued_events(force=force_flush)
129129
self._delay_processing()
130130
self._expire_events()
131-
except (DatabaseLockedError, ProcessingError):
131+
except (DatabaseLockedError, ProcessingError, DatabaseDiskIOError):
132132
if self._shutdown_requested():
133133
return
134134

@@ -150,6 +150,13 @@ def _process_event(self):
150150
self._queue.qsize(),
151151
exc=exc)
152152
raise
153+
except DatabaseDiskIOError as exc:
154+
self._safe_log(
155+
'debug',
156+
'Disk I/O error, will try again later (queue length %d)',
157+
self._queue.qsize(),
158+
exc=exc)
159+
raise
153160
except Exception as exc:
154161
self._log_processing_error(exc)
155162
raise ProcessingError from exc
@@ -160,7 +167,7 @@ def _process_event(self):
160167
def _expire_events(self):
161168
try:
162169
self._database.expire_events()
163-
except DatabaseLockedError:
170+
except (DatabaseLockedError, DatabaseDiskIOError):
164171
# Nothing to handle, if it fails, we will either successfully publish
165172
# these messages next time or we will delete them on the next pass.
166173
pass
@@ -242,6 +249,13 @@ def _fetch_queued_events_for_flush(self):
242249
'Database is locked, will try again later (queue length %d)',
243250
self._queue.qsize(),
244251
exc=exc)
252+
except DatabaseDiskIOError as exc:
253+
self._safe_log(
254+
'debug',
255+
'Disk I/O error, will try again later (queue length %d)',
256+
self._queue.qsize(),
257+
exc=exc)
258+
raise
245259
except Exception as exc:
246260
# just log the exception and hope we can recover from the error
247261
self._safe_log('exception', 'Error retrieving queued events: %s', exc, exc=exc)
@@ -252,7 +266,7 @@ def _fetch_queued_events_for_flush(self):
252266
def _delete_queued_events_from_database(self):
253267
try:
254268
self._database.delete_queued_events()
255-
except DatabaseLockedError:
269+
except (DatabaseLockedError, DatabaseDiskIOError):
256270
pass # nothing to handle, if it fails, we delete those events in a later run
257271

258272
# ----------------------------------------------------------------------

tests/database_test.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@
77
import sqlite3
88
import time
99
import unittest
10+
from stat import S_IREAD, S_IRGRP, S_IROTH, S_IWUSR
1011

1112
from logstash_async.constants import constants
12-
from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache
13+
from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache, DatabaseDiskIOError
1314

1415

1516
# pylint: disable=protected-access
1617

1718

1819
class DatabaseCacheTest(unittest.TestCase):
19-
2020
TEST_DB_FILENAME = "test.db"
2121
_connection = None
2222

@@ -63,6 +63,15 @@ def close_connection(cls):
6363
cls._connection.close()
6464
cls._connection = None
6565

66+
# ----------------------------------------------------------------------
67+
def testIOException(self):
68+
self.cache.add_event("message")
69+
with self.assertRaises(DatabaseDiskIOError):
70+
# change permissions to produce error
71+
os.chmod(os.path.abspath("test.db"), S_IREAD | S_IRGRP | S_IROTH)
72+
self.cache.add_event("message")
73+
os.chmod(os.path.abspath("test.db"), S_IWUSR | S_IREAD)
74+
6675
# ----------------------------------------------------------------------
6776
def test_add_event(self):
6877
self.cache.add_event("message")

0 commit comments

Comments
 (0)