Skip to content

Commit d2f4f1c

Browse files
committed
wip: [tests, documentation] Changes on the mapping and summary of the MISP to STIX export documentation
- Supporting inputs taken from test samples which are now validated MISP format - Generating simplified mapping documentation with a single STIX result for a given MISP sample covering how data is converted depending or not on a `to_ids` flag, to better document the actual behavior implemented recently to convert to both Observable objects and Indicators (and the conversion to other types of objects remains unchanged)
1 parent 5507aa1 commit d2f4f1c

File tree

1 file changed

+113
-92
lines changed

1 file changed

+113
-92
lines changed

tests/update_documentation.py

Lines changed: 113 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
# -*- coding: utf-8 -*-
33

44
import json
5+
from base64 import b64encode
56
from pathlib import Path
67

78
_ROOT_PATH = Path(__file__).parents[1].resolve()
8-
_OBJECT_FEATURES = ('Indicator', 'Observed Data')
9+
_OBJECT_FEATURES = ('indicator', 'observed-data')
910
_PATTERNING_TYPES = (
1011
'sigma',
1112
'snort',
@@ -26,11 +27,11 @@ def __init__(self, filename, feature):
2627
documentation_path = _ROOT_PATH / 'documentation' / 'mapping'
2728
self.__mapping_path = documentation_path / f'{filename}.json'
2829
with open(self.__mapping_path, 'rt', encoding='utf-8') as f:
29-
self._documentation = json.loads(f.read())
30+
self._documentation = json.load(f)
3031
self.__summary_path = documentation_path / f'{filename}_summary.json'
3132
try:
3233
with open(self.__summary_path, 'rt', encoding='utf-8') as f:
33-
self._summary = json.loads(f.read())
34+
self._summary = json.load(f)
3435
except (FileNotFoundError, json.decoder.JSONDecodeError):
3536
self._summary = {}
3637
self._summary_changed = False
@@ -64,10 +65,8 @@ def check_export_mapping(self, feature):
6465
else:
6566
if mapping['MISP'] != self._documentation[name]['MISP']:
6667
self._documentation[name]['MISP'] = mapping['MISP']
67-
for stix_type, stix_object in mapping['STIX'].items():
68-
stixobject = self._documentation[name]['STIX'].get(stix_type, {})
69-
if stix_object != stixobject:
70-
self._documentation[name]['STIX'][stix_type] = stix_object
68+
if mapping['STIX'] != self._documentation[name]['STIX']:
69+
self._documentation[name]['STIX'] = mapping['STIX']
7170
self._check_stix_export_summary(name, mapping['STIX'], feature)
7271
self._write_documentation()
7372
else:
@@ -115,13 +114,40 @@ def _declare_mapping(self, mapping):
115114
def _declare_summary(self, summary):
116115
self.__summary_mapping = summary
117116

117+
def _define_export_summary(self, *object_types):
118+
return ' / '.join(
119+
f'**{object_type.capitalize()}**' for object_type in object_types
120+
)
121+
122+
def _define_import_summary(self, stix_mapping):
123+
types = []
124+
for stix_type, mapping in stix_mapping.items():
125+
stix_object = mapping['STIX']
126+
if stix_type == 'Indicator':
127+
types.append(self._pattern_types(stix_object['pattern']))
128+
elif stix_type == 'Observed Data':
129+
observables = stix_object[1:] if isinstance(stix_object, list) else tuple(stix_object['objects'].values())
130+
types.append(f"{self._observable_types(observables)} (observable)")
131+
else:
132+
types.append(f'**{stix_type}**')
133+
return ' / '.join(types)
134+
118135
def _define_stix20_export_summary(self, stix_mapping):
119-
if all(feature in stix_mapping for feature in _OBJECT_FEATURES):
120-
return self._observable_types(stix_mapping['Observed Data']['objects'].values())
121-
if len(stix_mapping.keys()) == 1 and 'Indicator' in stix_mapping:
122-
indicator_type = self._pattern_types(stix_mapping['Indicator']['pattern'])
123-
return f"{indicator_type} / Custom Object"
124-
return self._define_export_summary(stix_mapping)
136+
if isinstance(stix_mapping, dict):
137+
return self._define_export_summary(stix_mapping['type'])
138+
object_types = {
139+
stix_object['type'] for stix_object in stix_mapping
140+
if stix_object['type'] != 'relationship'
141+
}
142+
if all(object_type in object_types for object_type in _OBJECT_FEATURES):
143+
return self._observable_types(
144+
*(
145+
observable['type'] for stix_object in stix_mapping
146+
if stix_object['type'] == 'observed-data'
147+
for observable in stix_object['objects']
148+
)
149+
)
150+
return self._define_export_summary(*object_types)
125151

126152
def _define_stix20_import_summary(self, stix_mapping):
127153
if all(feature in stix_mapping for feature in _OBJECT_FEATURES):
@@ -132,14 +158,20 @@ def _define_stix20_import_summary(self, stix_mapping):
132158
return self._define_import_summary(stix_mapping)
133159

134160
def _define_stix21_export_summary(self, stix_mapping):
135-
if all(feature in stix_mapping for feature in _OBJECT_FEATURES):
136-
return self._observable_types(stix_mapping['Observed Data'][1:])
137-
if len(stix_mapping.keys()) == 1 and 'Indicator' in stix_mapping:
138-
indicator = stix_mapping['Indicator']
139-
if indicator['pattern_type'] in _PATTERNING_TYPES:
140-
return '**Indicator**'
141-
return f"{self._pattern_types(indicator['pattern'])} / Custom Object"
142-
return self._define_export_summary(stix_mapping)
161+
if isinstance(stix_mapping, dict):
162+
return self._define_export_summary(stix_mapping['type'])
163+
object_types = {
164+
stix_object['type'] for stix_object in stix_mapping
165+
if stix_object['type'] != 'relationship'
166+
}
167+
if all(object_type in object_types for object_type in _OBJECT_FEATURES):
168+
return self._observable_types(
169+
*(
170+
object_type for object_type in object_types
171+
if object_type not in _OBJECT_FEATURES
172+
)
173+
)
174+
return self._define_export_summary(*object_types)
143175

144176
def _define_stix21_import_summary(self, stix_mapping):
145177
if all(feature in stix_mapping for feature in _OBJECT_FEATURES):
@@ -151,40 +183,23 @@ def _define_stix21_import_summary(self, stix_mapping):
151183
return f"{self._pattern_types(indicator['pattern'])} / Custom Object"
152184
return self._define_import_summary(stix_mapping)
153185

154-
def _define_export_summary(self, stix_mapping):
155-
types = []
156-
for stix_type, stix_object in stix_mapping.items():
157-
if stix_type == 'Indicator':
158-
types.append(self._pattern_types(stix_object['pattern']))
159-
elif stix_type == 'Observed Data':
160-
observables = stix_object[1:] if isinstance(stix_object, list) else tuple(stix_object['objects'].values())
161-
types.append(f"{self._observable_types(observables)} (observable)")
162-
else:
163-
types.append(f'**{stix_type}**')
164-
return ' / '.join(types)
165-
166-
def _define_import_summary(self, stix_mapping):
167-
types = []
168-
for stix_type, mapping in stix_mapping.items():
169-
stix_object = mapping['STIX']
170-
if stix_type == 'Indicator':
171-
types.append(self._pattern_types(stix_object['pattern']))
172-
elif stix_type == 'Observed Data':
173-
observables = stix_object[1:] if isinstance(stix_object, list) else tuple(stix_object['objects'].values())
174-
types.append(f"{self._observable_types(observables)} (observable)")
175-
else:
176-
types.append(f'**{stix_type}**')
177-
return ' / '.join(types)
178-
179186
@staticmethod
180187
def _observable_type(observable_type):
181188
if observable_type in _SUMMARY_MAPPING:
182189
return _SUMMARY_MAPPING[observable_type]
183190
return observable_type.replace('-', ' ').title()
184191

185-
def _observable_types(self, observables):
186-
types = {self._observable_type(observable['type']) for observable in observables}
187-
return f"{' & '.join(sorted(types))} {'Objects' if len(observables) > 1 else 'Object'}"
192+
def _observable_types(self, *observable_types):
193+
types = ' & '.join(
194+
sorted(
195+
set(
196+
' '.join(observable_type.split('-')).title()
197+
for observable_type in observable_types
198+
)
199+
)
200+
)
201+
objects = 'Objects' if len(observable_types) > 1 else 'Object'
202+
return f"{types} {objects} and IoCs described in Indicator (pattern)"
188203

189204
def _order_mapping(self, name):
190205
return {key: attribute for key, attribute in sorted(getattr(self, name).items())}
@@ -195,49 +210,49 @@ def _pattern_types(self, pattern):
195210
types.add(self._observable_type(part.split(':')[0]))
196211
return f"{' & '.join(types)} {'Objects' if len(types) > 1 else 'Object'} (pattern)"
197212

198-
def _replace_data(self, attribute, name, stix_mapping):
199-
data = attribute['data']
213+
def _replace_data(self, name, misp_mapping, stix_mapping):
214+
data = misp_mapping['data']
200215
short_data = f"{data[:23]}[...]{data[-23:]}"
201-
attribute['data'] = short_data
216+
misp_mapping['data'] = short_data
202217
getattr(self, self.data_replacement[name].format(self.feature))(stix_mapping, data, short_data)
203218

204219
@staticmethod
205220
def _replace_export_file_data(mapping, data, short_data):
206-
if mapping.get('Indicator', {}).get('pattern') is not None:
207-
mapping['Indicator']['pattern'] = mapping['Indicator']['pattern'].replace(data, short_data)
208-
if isinstance(mapping['Observed Data'], list):
209-
for observable in mapping['Observed Data'][1:]:
210-
if observable['type'] == 'artifact':
211-
observable['payload_bin'] = observable['payload_bin'].replace(data, short_data)
212-
break
213-
else:
214-
for index, observable in mapping['Observed Data']['objects'].items():
215-
if observable['type'] == 'artifact':
216-
mapping['Observed Data']['objects'][index]['payload_bin'] = observable['payload_bin'].replace(data, short_data)
217-
break
221+
for stix_object in mapping:
222+
if stix_object['type'] == 'indicator':
223+
stix_object['pattern'] = stix_object['pattern'].replace(data, short_data)
224+
continue
225+
if stix_object['type'] == 'observed-data' and 'objects' in stix_object:
226+
for index, observable in stix_object['objects'].items():
227+
if observable['type'] == 'artifact':
228+
stix_object['objects'][index]['payload_bin'] = short_data
229+
continue
230+
continue
231+
if stix_object['type'] == 'artifact':
232+
stix_object['payload_bin'] = short_data
218233

219234
@staticmethod
220235
def _replace_import_file_data(mapping, data, short_data):
221236
if isinstance(mapping, list):
222237
for observable in mapping[1:]:
223238
if observable['type'] == 'artifact':
224-
observable['payload_bin'] = observable['payload_bin'].replace(data, short_data)
239+
observable['payload_bin'] = short_data
225240
break
226241
else:
227242
if mapping['type'] == 'indicator':
228243
mapping['pattern'] = mapping['pattern'].replace(data, short_data)
229244
else:
230245
for index, observable in mapping['objects'].items():
231246
if observable['type'] == 'artifact':
232-
mapping['objects'][index]['payload_bin'] = observable['payload_bin'].replace(data, short_data)
247+
mapping['objects'][index]['payload_bin'] = short_data
233248

234249
def _write_documentation(self):
235250
with open(self.mapping_path, 'wt', encoding='utf-8') as f:
236-
f.write(json.dumps(self._order_mapping('_documentation'), indent=4))
251+
f.write(json.dumps(dict(self._order_mapping('_documentation')), indent=4))
237252

238253
def _write_summary(self):
239254
with open(self.summary_path, 'wt', encoding='utf-8') as f:
240-
f.write(json.dumps(self._order_mapping('_summary'), indent=4))
255+
f.write(json.dumps(dict(self._order_mapping('_summary')), indent=4))
241256

242257

243258
class AttributesDocumentationUpdater(DocumentationUpdater):
@@ -255,7 +270,7 @@ def data_replacement(cls):
255270

256271
def _check_data(self, attribute_type, mapping):
257272
if 'data' in mapping['MISP'] and len(mapping['MISP']['data']) > 51:
258-
self._replace_data(mapping['MISP'], attribute_type, mapping['STIX'])
273+
self._replace_data(attribute_type, mapping['MISP'], mapping['STIX'])
259274

260275
def _load_attributes_mapping(self, attributes_mapping):
261276
self._declare_summary(attributes_mapping.pop('summary', {}))
@@ -307,13 +322,12 @@ def _check_data(self, name, mapping):
307322
if isinstance(mapping['MISP'], list):
308323
for misp_object in mapping['MISP']:
309324
for attribute in misp_object['Attribute']:
310-
if 'data' in attribute and attribute['data'] is not None and len(attribute['data']) > 51:
311-
self._replace_data(attribute, name, mapping['STIX'])
325+
if 'data' in attribute and len(attribute['data']) > 51:
326+
self._replace_data(name, attribute, mapping['STIX'])
312327
else:
313328
for attribute in mapping['MISP']['Attribute']:
314329
if 'data' in attribute and len(attribute['data']) > 51:
315-
self._replace_data(attribute, name, mapping['STIX'])
316-
330+
self._replace_data(name, attribute, mapping['STIX'])
317331

318332
def _load_objects_mapping(self, objects_mapping):
319333
self._declare_summary(objects_mapping.pop('summary', {}))
@@ -327,40 +341,47 @@ def _load_objects_mapping(self, objects_mapping):
327341
self._declare_mapping(objects_mapping)
328342

329343
def _replace_export_account_data(self, mapping, data, short_data):
330-
mapping['Indicator']['pattern'] = mapping['Indicator']['pattern'].replace(data, short_data)
331-
if isinstance(mapping['Observed Data'], list):
332-
for observable in mapping['Observed Data'][1:]:
333-
self._replace_custom_field(observable, data, short_data)
334-
else:
335-
for observable in mapping['Observed Data']['objects'].values():
336-
self._replace_custom_field(observable, data, short_data)
344+
for stix_object in mapping:
345+
if stix_object['type'] == 'indicator':
346+
stix_object['pattern'] = stix_object['pattern'].replace(data, short_data)
347+
continue
348+
if stix_object['type'] == 'observed-data':
349+
if 'objects' in stix_object:
350+
for observable in stix_object['objects'].values():
351+
self._replace_custom_field(observable, short_data)
352+
continue
353+
if stix_object['type'] != 'relationship':
354+
self._replace_custom_field(stix_object, short_data)
337355

338356
def _replace_import_account_data(self, mapping, data, short_data):
339357
if isinstance(mapping, list):
340358
for observable in mapping[1:]:
341-
self._replace_custom_field(observable, data, short_data)
359+
self._replace_custom_field(observable, short_data)
342360
else:
343361
if mapping['type'] == 'indicator':
344362
mapping['pattern'] = mapping['pattern'].replace(data, short_data)
345363
else:
346364
for observable in mapping['objects'].values():
347-
self._replace_custom_field(observable, data, short_data)
365+
self._replace_custom_field(observable, short_data)
348366

349367
@staticmethod
350-
def _replace_custom_field(mapping, data, short_data):
368+
def _replace_custom_field(mapping, short_data):
351369
for feature, value in mapping.items():
352370
if feature.startswith('x_misp_') and isinstance(value, dict):
353-
value['data'] = value['data'].replace(data, short_data)
371+
value['data'] = short_data
354372
break
355373

356-
def _replace_export_identity_data(self, mapping, data, short_data):
357-
self._replace_custom_field(mapping['Identity'], data, short_data)
374+
def _replace_export_identity_data(self, mapping, _, short_data):
375+
self._replace_custom_field(mapping, short_data)
358376

359-
def _replace_import_identity_data(self, mapping, data, short_data):
360-
self._replace_custom_field(mapping, data, short_data)
377+
def _replace_import_identity_data(self, mapping, _, short_data):
378+
self._replace_custom_field(mapping, short_data)
361379

362-
def _replace_export_annotation_data(self, mapping, data, short_data):
363-
self._replace_custom_field(mapping['Note'], data, short_data)
380+
def _replace_export_annotation_data(self, mapping, _, short_data):
381+
for stix_object in mapping:
382+
if stix_object['type'] != 'note':
383+
continue
384+
self._replace_custom_field(stix_object, short_data)
364385

365-
def _replace_import_annotation_data(self, mapping, data, short_data):
366-
self._replace_custom_field(mapping, data, short_data)
386+
def _replace_import_annotation_data(self, mapping, _, short_data):
387+
self._replace_custom_field(mapping, short_data)

0 commit comments

Comments
 (0)