-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistributed_conformance_checker.py
139 lines (106 loc) · 4.67 KB
/
distributed_conformance_checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright 2024 Kiel University
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import os
from dotenv import load_dotenv
from pm4py.algo.filtering.log.variants.variants_filter import \
filter_variants_top_k
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.statistics.variants.log.get import get_variants
from activity_node import ActivityNode
from alpha_miner_original import run_original_alpha_miner
from auxiliaries.event_log_adjuster import no_doubled_timestamps
from auxiliaries.file_reader import read_event_log
from auxiliaries.model_calculator import (calculate_model_of_partial_log,
compute_activities_and_mapping)
load_dotenv()
LOG_NAME = str(os.getenv('CURRENT_LOG'))
EVENT_LOG = str(os.getenv(f"{LOG_NAME}_LOG"))
FACTOR = float(os.getenv("FACTOR"))
# filename for "fitness per partial log"
# filename = f"outputs/{LOG_NAME}_fitness_per_partial_log_{int(FACTOR*100)}_percent.csv"
# filename for "fitness per trace"
# filename = f"outputs/{LOG_NAME}_fitness_per_trace_{int(FACTOR*100)}_percent.csv"
# set up output file
# with open(filename ,"w", encoding="utf-8") as f:
# f.write("case_id;fitness\n")
# ----------
def get_node_by_id(nodes, node_id):
for n in nodes:
if n.activity_id == node_id:
return n
def calculate_conformance_full_log(case_id):
# Calculate conformance of full log
total_mismatches = 0
total_matches = 0
for n in nodes:
mismatches, matches = n.get_matches_mismatches_full_log()
total_mismatches += mismatches
total_matches += matches
conformance_metric_log = total_matches/(total_mismatches + total_matches)
print(f"\n\nConformance of full log (up till now) = {conformance_metric_log}")
# with open(filename, "a") as f:
# f.write(f"{case_id};{conformance_metric_log}\n")
# ----------
# Create event logs
log = read_event_log(EVENT_LOG)
log = log.filter(items=["case:concept:name", "concept:name", "time:timestamp"]) # filter changes order
log = log.sort_values(["case:concept:name","time:timestamp"])
log = no_doubled_timestamps(log)
server_id_to_activity_name_mapping, activity_name_to_server_id_mapping, activity_count = compute_activities_and_mapping(log)
number_events = log.shape[0]
# Only use a certain percentage of variants for the model
variants = get_variants(log)
k = math.floor(FACTOR * len(variants))
model_event_log = filter_variants_top_k(log, k=k)
model_event_log = log_converter.apply(model_event_log, variant=log_converter.Variants.TO_DATA_FRAME)
model_fm, model_start_activities, model_end_activities = calculate_model_of_partial_log(model_event_log, activity_count, activity_name_to_server_id_mapping)
# run_original_alpha_miner(log)
# run_original_alpha_miner(model_event_log)
# Create nodes
nodes = []
for activity_id in activity_name_to_server_id_mapping.values():
nodes.append(ActivityNode(activity_id, [row[activity_id] for row in model_fm], activity_id in model_start_activities, activity_id in model_end_activities))
for node in nodes:
node.set_nodes_list(nodes)
## Checking Conformance
# Run through log
log["case_next_event"] = log["case:concept:name"].shift(-1)
# get fitness of partial log after every case
fitness = []
current_case = None
case_id = 0
for k, (_, row) in enumerate(log.iterrows()):
activity_name = row['concept:name']
activity_id = int(activity_name_to_server_id_mapping[str(activity_name)])
case_name = str(row['case:concept:name'])
case_name_next_event = str(row['case_next_event'])
# start event
if case_name != current_case:
mismatches = 0
matches = 0
start_of_trace = True
pred = None
# not start event
else:
start_of_trace = False
pred_name = log.iloc[k-1]['concept:name']
pred = int(activity_name_to_server_id_mapping[str(pred_name)])
# check whether it is an end event
if k == int(log.shape[0]) - 1 or case_name != case_name_next_event:
if k != int(log.shape[0]-1):
print(f"case ID: {case_name} next case: {case_name_next_event} activity {server_id_to_activity_name_mapping[str(activity_id)]}")
end_of_trace = True
else:
end_of_trace = False
# trigger event at activity node
mismatches, matches = get_node_by_id(nodes, activity_id).trigger_event(activity_id, case_id, start_of_trace, end_of_trace, pred, mismatches, matches)
fitness.append(matches/(matches+mismatches))
# calculate fitness after each trace
if end_of_trace:
calculate_conformance_full_log(case_id)
case_id += 1
current_case = case_name
# print(fitness)