Skip to content

Commit 19b6d96

Browse files
authored
Changed queue to use JSON serialization (#5032)
1 parent c04862c commit 19b6d96

File tree

7 files changed

+161
-55
lines changed

7 files changed

+161
-55
lines changed

config/dpkg/python3-plaso.install

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ usr/lib/python3*/dist-packages/plaso/*/*.py
33
usr/lib/python3*/dist-packages/plaso/*/*.yaml
44
usr/lib/python3*/dist-packages/plaso/*/*/*.py
55
usr/lib/python3*/dist-packages/plaso/*/*/*.yaml
6-
usr/lib/python3*/dist-packages/plaso*.egg-info
6+
usr/lib/python3*/dist-packages/plaso*.dist-info

plaso/containers/events.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ class EventData(interface.AttributeContainer):
209209

210210
CONTAINER_TYPE = 'event_data'
211211

212+
SCHEMA = {
213+
'_event_data_stream_identifier': 'AttributeContainerIdentifier',
214+
'_event_values_hash': 'str',
215+
'_parser_chain': 'str',
216+
'data_type': 'str'}
217+
212218
_SERIALIZABLE_PROTECTED_ATTRIBUTES = [
213219
'_event_data_stream_identifier',
214220
'_event_values_hash',
@@ -499,6 +505,25 @@ def SetEventIdentifier(self, event_identifier):
499505
self._event_identifier = event_identifier
500506

501507

508+
class EventTripple(interface.AttributeContainer):
509+
"""Event tripple.
510+
511+
Attributes:
512+
event (EventObject): event.
513+
event_data (EventData): event data.
514+
event_data_stream (EventDataStream): event data stream.
515+
"""
516+
517+
CONTAINER_TYPE = 'event_tripple'
518+
519+
def __init__(self):
520+
"""Initializes an event tripple."""
521+
super(EventTripple, self).__init__()
522+
self.event = None
523+
self.event_data = None
524+
self.event_data_stream = None
525+
526+
502527
# TODO: the YearLessLogHelper attribute container is kept for backwards
503528
# compatibility remove once storage format 20230327 is obsolete.
504529
class YearLessLogHelper(interface.AttributeContainer):
@@ -557,4 +582,4 @@ def SetEventDataStreamIdentifier(self, event_data_stream_identifier):
557582

558583
manager.AttributeContainersManager.RegisterAttributeContainers([
559584
DateLessLogHelper, EventData, EventDataStream, EventObject, EventTag,
560-
YearLessLogHelper])
585+
EventTripple, YearLessLogHelper])

plaso/multi_process/analysis_engine.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,12 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
142142

143143
for event_queue in self._event_queues.values():
144144
# TODO: Check for premature exit of analysis plugins.
145-
event_queue.PushItem((event, event_data, event_data_stream))
145+
event_tripple = events.EventTripple()
146+
event_tripple.event = event
147+
event_tripple.event_data = event_data
148+
event_tripple.event_data_stream = event_data_stream
149+
150+
event_queue.PushItem(event_tripple)
146151

147152
self._number_of_consumed_events += 1
148153

plaso/multi_process/analysis_process.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def _Main(self):
165165
logger.debug('ConsumeItems exiting, dequeued QueueAbort object.')
166166
break
167167

168-
self._ProcessEvent(self._analysis_mediator, *queued_object)
168+
self._ProcessEventTripple(self._analysis_mediator, queued_object)
169169

170170
self._number_of_consumed_events += 1
171171

@@ -230,19 +230,18 @@ def _Main(self):
230230
except errors.QueueAlreadyClosed:
231231
logger.error('Queue for {0:s} was already closed.'.format(self.name))
232232

233-
def _ProcessEvent(self, mediator, event, event_data, event_data_stream):
234-
"""Processes an event.
233+
def _ProcessEventTripple(self, mediator, event_tripple):
234+
"""Processes an event tripple.
235235
236236
Args:
237237
mediator (AnalysisMediator): mediates interactions between
238238
analysis plugins and other components, such as storage and dfvfs.
239-
event (EventObject): event.
240-
event_data (EventData): event data.
241-
event_data_stream (EventDataStream): event data stream.
239+
event_tripple (EventTripple): event tripple.
242240
"""
243241
try:
244242
self._analysis_plugin.ExamineEvent(
245-
mediator, event, event_data, event_data_stream)
243+
mediator, event_tripple.event, event_tripple.event_data,
244+
event_tripple.event_data_stream)
246245

247246
except Exception as exception: # pylint: disable=broad-except
248247
# TODO: write analysis error and change logger to debug only.

plaso/serializer/json_serializer.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,21 @@ def _ConvertAttributeContainerToJSON(cls, attribute_container):
6565
dict[str, object]: JSON serialized objects.
6666
"""
6767
if attribute_container.CONTAINER_TYPE not in (
68-
'event_data', 'system_configuration'):
68+
'event_data', 'event_tripple', 'system_configuration'):
6969
return cls.ConvertAttributeContainerToJSON(attribute_container)
7070

7171
json_dict = {
7272
'__type__': 'AttributeContainer',
7373
'__container_type__': attribute_container.CONTAINER_TYPE}
7474

7575
for attribute_name, attribute_value in attribute_container.GetAttributes():
76-
json_dict[attribute_name] = cls._ConvertValueToJSON(attribute_value)
76+
if isinstance(
77+
attribute_value, containers_interface.AttributeContainerIdentifier):
78+
attribute_value = attribute_value.CopyToString()
79+
else:
80+
attribute_value = cls._ConvertValueToJSON(attribute_value)
81+
82+
json_dict[attribute_name] = attribute_value
7783

7884
return json_dict
7985

@@ -148,7 +154,8 @@ def _ConvertJSONToAttributeContainer(cls, json_dict):
148154
# Use __container_type__ to indicate the attribute container type.
149155
container_type = json_dict.get('__container_type__', None)
150156

151-
if container_type not in ('event_data', 'system_configuration'):
157+
if container_type not in (
158+
'event_data', 'event_tripple', 'system_configuration'):
152159
return cls.ConvertJSONToAttributeContainer(json_dict)
153160

154161
attribute_container = cls._CONTAINERS_MANAGER.CreateAttributeContainer(

plaso/storage/sqlite/sqlite_file.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def _CreateAttributeContainerFromRow(
7575
AttributeContainer: attribute container.
7676
"""
7777
schema = self._GetAttributeContainerSchema(container_type)
78-
if schema:
78+
if schema and container_type != self._CONTAINER_TYPE_EVENT_DATA:
7979
return super(SQLiteStorageFile, self)._CreateAttributeContainerFromRow(
8080
container_type, column_names, row, first_column_index)
8181

@@ -106,7 +106,7 @@ def _CreateAttributeContainerTable(self, container_type):
106106
an unsupported attribute container is provided.
107107
"""
108108
schema = self._GetAttributeContainerSchema(container_type)
109-
if schema:
109+
if schema and container_type != self._CONTAINER_TYPE_EVENT_DATA:
110110
super(SQLiteStorageFile, self)._CreateAttributeContainerTable(
111111
container_type)
112112
else:
@@ -276,7 +276,7 @@ def _WriteNewAttributeContainer(self, container):
276276
OSError: when there is an error querying the storage file.
277277
"""
278278
schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE)
279-
if schema:
279+
if schema and container.CONTAINER_TYPE != self._CONTAINER_TYPE_EVENT_DATA:
280280
super(SQLiteStorageFile, self)._WriteNewAttributeContainer(container)
281281
else:
282282
next_sequence_number = self._GetAttributeContainerNextSequenceNumber(
@@ -328,7 +328,7 @@ def GetAttributeContainerByIndex(self, container_type, index):
328328
the storage file.
329329
"""
330330
schema = self._GetAttributeContainerSchema(container_type)
331-
if schema:
331+
if schema and container_type != self._CONTAINER_TYPE_EVENT_DATA:
332332
container = super(SQLiteStorageFile, self).GetAttributeContainerByIndex(
333333
container_type, index)
334334

@@ -401,7 +401,7 @@ def GetAttributeContainers(self, container_type, filter_expression=None):
401401
OSError: when there is an error querying the storage file.
402402
"""
403403
schema = self._GetAttributeContainerSchema(container_type)
404-
if schema:
404+
if schema and container_type != self._CONTAINER_TYPE_EVENT_DATA:
405405
for container in super(SQLiteStorageFile, self).GetAttributeContainers(
406406
container_type, filter_expression=filter_expression):
407407
# TODO: the YearLessLogHelper attribute container is kept for backwards

tests/serializer/json_serializer.py

Lines changed: 107 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
class JSONSerializerTestCase(shared_test_lib.BaseTestCase):
3030
"""Tests for a JSON serializer object."""
3131

32+
# pylint: disable=protected-access
33+
3234
def _TestReadSerialized(self, serializer_object, json_dict):
3335
"""Tests the ReadSerialized function.
3436
@@ -78,6 +80,76 @@ class JSONAttributeContainerSerializerTest(JSONSerializerTestCase):
7880

7981
# pylint: disable=protected-access
8082

83+
def CreateTestEventData(self, event_data_stream):
84+
"""Creates a test event data.
85+
86+
Args:
87+
event_data_stream (EventDataStream): event data stream.
88+
89+
Returns:
90+
EventData: event data.
91+
"""
92+
event_data = events.EventData()
93+
event_data._ignored = 'Not serialized'
94+
event_data._parser_chain = 'test_parser'
95+
event_data.data_type = 'test:event2'
96+
97+
event_data.a_tuple = ('some item', [234, 52, 15])
98+
event_data.empty_string = ''
99+
event_data.float = -122.082203542683
100+
event_data.integer = 34
101+
event_data.my_list = ['asf', 4234, 2, 54, 'asf']
102+
event_data.null_value = None
103+
event_data.string = 'Normal string'
104+
event_data.unicode_string = 'And I am a unicorn.'
105+
event_data.zero_integer = 0
106+
107+
event_data_stream_identifier = event_data_stream.GetIdentifier()
108+
event_data.SetEventDataStreamIdentifier(event_data_stream_identifier)
109+
110+
return event_data
111+
112+
def CreateTestEventDataStream(self):
113+
"""Creates a test event data stream.
114+
115+
Returns:
116+
EventDataSteam: event data stream.
117+
"""
118+
test_file_path = self._GetTestFilePath(['ímynd.dd'])
119+
120+
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
121+
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
122+
test_path_spec = path_spec_factory.Factory.NewPathSpec(
123+
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
124+
parent=volume_path_spec)
125+
126+
event_data_stream = events.EventDataStream()
127+
event_data_stream.md5_hash = 'e3df0d2abd2c27fbdadfb41a47442520'
128+
event_data_stream.path_spec = test_path_spec
129+
130+
return event_data_stream
131+
132+
def CreateTestEventObject(self, event_data):
133+
"""Creates a test event object.
134+
135+
Args:
136+
event_data (EventData): event data.
137+
138+
Returns:
139+
EventObject: an event object.
140+
"""
141+
test_date_time = dfdatetime_posix_time.PosixTime(timestamp=1621839644)
142+
143+
event = events.EventObject()
144+
event.date_time = test_date_time
145+
event.timestamp = 1621839644
146+
event.timestamp_desc = definitions.TIME_DESCRIPTION_MODIFICATION
147+
148+
event_data_identifier = event_data.GetIdentifier()
149+
event.SetEventDataIdentifier(event_data_identifier)
150+
151+
return event
152+
81153
def testReadAndWriteSerializedAnalysisReport(self):
82154
"""Test ReadSerialized and WriteSerialized of AnalysisReport."""
83155
expected_report_text = (
@@ -110,21 +182,8 @@ def testReadAndWriteSerializedAnalysisReport(self):
110182

111183
def testReadAndWriteSerializedEventData(self):
112184
"""Test ReadSerialized and WriteSerialized of EventData."""
113-
expected_event_data = events.EventData()
114-
expected_event_data._event_data_stream_identifier = 'event_data_stream.1'
115-
expected_event_data._ignored = 'Not serialized'
116-
expected_event_data._parser_chain = 'test_parser'
117-
expected_event_data.data_type = 'test:event2'
118-
119-
expected_event_data.a_tuple = ('some item', [234, 52, 15])
120-
expected_event_data.empty_string = ''
121-
expected_event_data.float = -122.082203542683
122-
expected_event_data.integer = 34
123-
expected_event_data.my_list = ['asf', 4234, 2, 54, 'asf']
124-
expected_event_data.null_value = None
125-
expected_event_data.string = 'Normal string'
126-
expected_event_data.unicode_string = 'And I am a unicorn.'
127-
expected_event_data.zero_integer = 0
185+
expected_event_data_stream = self.CreateTestEventDataStream()
186+
expected_event_data = self.CreateTestEventData(expected_event_data_stream)
128187

129188
json_string = (
130189
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
@@ -140,7 +199,8 @@ def testReadAndWriteSerializedEventData(self):
140199
self.assertIsInstance(event_data, events.EventData)
141200

142201
expected_event_data_dict = {
143-
'_event_data_stream_identifier': 'event_data_stream.1',
202+
'_event_data_stream_identifier': (
203+
expected_event_data.GetEventDataStreamIdentifier().CopyToString()),
144204
'_parser_chain': 'test_parser',
145205
'a_tuple': ('some item', [234, 52, 15]),
146206
'data_type': 'test:event2',
@@ -157,17 +217,7 @@ def testReadAndWriteSerializedEventData(self):
157217

158218
def testReadAndWriteSerializedEventDataStream(self):
159219
"""Test ReadSerialized and WriteSerialized of EventDataStream."""
160-
test_file_path = self._GetTestFilePath(['ímynd.dd'])
161-
162-
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
163-
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
164-
test_path_spec = path_spec_factory.Factory.NewPathSpec(
165-
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
166-
parent=volume_path_spec)
167-
168-
expected_event_data_stream = events.EventDataStream()
169-
expected_event_data_stream.md5_hash = 'e3df0d2abd2c27fbdadfb41a47442520'
170-
expected_event_data_stream.path_spec = test_path_spec
220+
expected_event_data_stream = self.CreateTestEventDataStream()
171221

172222
json_string = (
173223
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
@@ -184,21 +234,17 @@ def testReadAndWriteSerializedEventDataStream(self):
184234

185235
expected_event_data_stream_dict = {
186236
'md5_hash': 'e3df0d2abd2c27fbdadfb41a47442520',
187-
'path_spec': test_path_spec}
237+
'path_spec': expected_event_data_stream.path_spec}
188238

189239
event_data_stream_dict = event_data_stream.CopyToDict()
190240

191241
self.assertEqual(event_data_stream_dict, expected_event_data_stream_dict)
192242

193243
def testReadAndWriteSerializedEventObject(self):
194244
"""Test ReadSerialized and WriteSerialized of EventObject."""
195-
test_date_time = dfdatetime_posix_time.PosixTime(timestamp=1621839644)
196-
197-
expected_event = events.EventObject()
198-
expected_event._event_data_identifier = 'event_data.1'
199-
expected_event.date_time = test_date_time
200-
expected_event.timestamp = 1621839644
201-
expected_event.timestamp_desc = definitions.TIME_DESCRIPTION_MODIFICATION
245+
expected_event_data_stream = self.CreateTestEventDataStream()
246+
expected_event_data = self.CreateTestEventData(expected_event_data_stream)
247+
expected_event = self.CreateTestEventObject(expected_event_data)
202248

203249
json_string = (
204250
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
@@ -213,8 +259,9 @@ def testReadAndWriteSerializedEventObject(self):
213259
self.assertIsInstance(event, events.EventObject)
214260

215261
expected_event_dict = {
216-
'_event_data_identifier': 'event_data.1',
217-
'date_time': test_date_time.CopyToDateTimeStringISO8601(),
262+
'_event_data_identifier': (
263+
expected_event.GetEventDataIdentifier().CopyToString()),
264+
'date_time': expected_event.date_time.CopyToDateTimeStringISO8601(),
218265
'timestamp': 1621839644,
219266
'timestamp_desc': definitions.TIME_DESCRIPTION_MODIFICATION}
220267

@@ -296,6 +343,29 @@ def testReadAndWriteSerializedEventTag(self):
296343
sorted(event_tag_dict.items()),
297344
sorted(expected_event_tag_dict.items()))
298345

346+
def testReadAndWriteSerializedEventTripple(self):
347+
"""Test ReadSerialized and WriteSerialized of EventTripple."""
348+
expected_event_data_stream = self.CreateTestEventDataStream()
349+
expected_event_data = self.CreateTestEventData(expected_event_data_stream)
350+
expected_event_tripple = events.EventTripple()
351+
expected_event_tripple.event = self.CreateTestEventObject(
352+
expected_event_data)
353+
expected_event_tripple.event_data = expected_event_data
354+
expected_event_tripple.event_data_stream = expected_event_data_stream
355+
356+
json_string = (
357+
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
358+
expected_event_tripple))
359+
360+
self.assertIsNotNone(json_string)
361+
362+
event_tripple = (
363+
json_serializer.JSONAttributeContainerSerializer.ReadSerialized(
364+
json_string))
365+
366+
self.assertIsNotNone(event_tripple)
367+
self.assertIsInstance(event_tripple, events.EventTripple)
368+
299369
def testReadAndWriteSerializedSession(self):
300370
"""Test ReadSerialized and WriteSerialized of Session."""
301371
expected_session = sessions.Session()

0 commit comments

Comments
 (0)