20
20
import java .util .ArrayList ;
21
21
import java .util .Collection ;
22
22
import java .util .Collections ;
23
+ import java .util .Comparator ;
23
24
import java .util .List ;
25
+ import java .util .stream .Collectors ;
24
26
25
27
import org .apache .commons .lang3 .StringUtils ;
26
28
import org .apache .shardingsphere .elasticjob .api .JobConfiguration ;
29
+ import org .apache .shardingsphere .elasticjob .kernel .internal .config .ConfigurationService ;
30
+ import org .apache .shardingsphere .elasticjob .kernel .internal .failover .FailoverService ;
27
31
import org .apache .shardingsphere .elasticjob .kernel .internal .instance .InstanceService ;
28
32
import org .apache .shardingsphere .elasticjob .kernel .internal .schedule .JobRegistry ;
33
+ import org .apache .shardingsphere .elasticjob .kernel .internal .sharding .ExecutionContextService ;
34
+ import org .apache .shardingsphere .elasticjob .kernel .internal .sharding .ExecutionService ;
29
35
import org .apache .shardingsphere .elasticjob .kernel .internal .sharding .JobInstance ;
36
+ import org .apache .shardingsphere .elasticjob .kernel .internal .sharding .ShardingService ;
30
37
import org .apache .shardingsphere .elasticjob .kernel .internal .storage .JobNodeStorage ;
31
38
import org .apache .shardingsphere .elasticjob .kernel .tracing .config .TracingConfiguration ;
39
+ import org .apache .shardingsphere .elasticjob .kernel .tracing .event .JobTracingEventBus ;
32
40
import org .apache .shardingsphere .elasticjob .reg .base .CoordinatorRegistryCenter ;
33
41
import org .apache .shardingsphere .elasticjob .spi .listener .ElasticJobListener ;
34
42
import org .apache .shardingsphere .elasticjob .spi .listener .param .ShardingContexts ;
40
48
*/
41
49
@ Slf4j
42
50
public final class SingleShardingJobFacade extends AbstractJobFacade {
43
-
51
+
52
+ private final ConfigurationService configService ;
53
+
54
+ private final ShardingService shardingService ;
55
+
56
+ private final ExecutionContextService executionContextService ;
57
+
58
+ private final ExecutionService executionService ;
59
+
60
+ private final FailoverService failoverService ;
61
+
62
+ private final Collection <ElasticJobListener > elasticJobListeners ;
63
+
64
+ private final JobTracingEventBus jobTracingEventBus ;
65
+
44
66
private final JobNodeStorage jobNodeStorage ;
67
+
45
68
private final InstanceService instanceService ;
46
-
47
- public SingleShardingJobFacade (final CoordinatorRegistryCenter regCenter , final String jobName , final Collection <ElasticJobListener > elasticJobListeners , final TracingConfiguration <?> tracingConfig ) {
69
+
70
+ public SingleShardingJobFacade (final CoordinatorRegistryCenter regCenter , final String jobName , final Collection <ElasticJobListener > elasticJobListeners ,
71
+ final TracingConfiguration <?> tracingConfig ) {
48
72
super (regCenter , jobName , elasticJobListeners , tracingConfig );
49
-
73
+
74
+ configService = new ConfigurationService (regCenter , jobName );
75
+ shardingService = new ShardingService (regCenter , jobName );
76
+ executionContextService = new ExecutionContextService (regCenter , jobName );
77
+ executionService = new ExecutionService (regCenter , jobName );
78
+ failoverService = new FailoverService (regCenter , jobName );
79
+ this .elasticJobListeners = elasticJobListeners .stream ().sorted (Comparator .comparingInt (ElasticJobListener ::order )).collect (Collectors .toList ());
80
+ this .jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus () : new JobTracingEventBus (tracingConfig );
50
81
jobNodeStorage = new JobNodeStorage (regCenter , jobName );
51
82
instanceService = new InstanceService (regCenter , jobName );
52
83
}
53
-
84
+
54
85
@ Override
55
86
public void registerJobCompleted (final ShardingContexts shardingContexts ) {
56
87
super .registerJobCompleted (shardingContexts );
57
-
88
+
58
89
JobConfiguration jobConfig = configService .load (true );
59
90
JobInstance jobInst = JobRegistry .getInstance ().getJobInstance (jobConfig .getJobName ());
60
91
if (null == jobInst ) {
@@ -66,21 +97,21 @@ public void registerJobCompleted(final ShardingContexts shardingContexts) {
66
97
for (int i = 0 ; i < availJobInst .size (); i ++) {
67
98
JobInstance temp = availJobInst .get (i );
68
99
if (temp .getServerIp ().equals (jobInst .getServerIp ())) {
69
- nextIndex = i + 1 ; // find the current running job instance, and set next one to current index + 1
100
+ nextIndex = i + 1 ;
70
101
break ;
71
102
}
72
103
}
73
- if (nextIndex != null ) { // the normal case that can find the next index, exclude the bounded scenarios
74
- nextIndex = nextIndex >= availJobInst .size () ? 0 : nextIndex ; // Round Robin Loop
104
+ if (nextIndex != null ) {
105
+ nextIndex = nextIndex >= availJobInst .size () ? 0 : nextIndex ;
75
106
jobNodeStorage .fillEphemeralJobNode ("next-job-instance-ip" , availJobInst .get (nextIndex ).getServerIp ());
76
107
}
77
-
108
+
78
109
if (log .isDebugEnabled ()) {
79
110
log .debug ("job name: {}, next index: {}, sharding total count: {}" ,
80
- jobConfig .getJobName (), nextIndex , jobConfig .getShardingTotalCount ());
111
+ jobConfig .getJobName (), nextIndex , jobConfig .getShardingTotalCount ());
81
112
}
82
113
}
83
-
114
+
84
115
/**
85
116
* Get sharding contexts.
86
117
*
@@ -96,32 +127,32 @@ public ShardingContexts getShardingContexts() {
96
127
return executionContextService .getJobShardingContext (failoverShardingItems );
97
128
}
98
129
}
99
-
130
+
100
131
List <Integer > shardingItems ;
101
132
String nextJobInstIP = null ;
102
- if (isNeedSharding ()) { // the first initialization or reconcile case
133
+ if (isNeedSharding ()) {
103
134
shardingService .shardingIfNecessary ();
104
135
shardingItems = shardingService .getLocalShardingItems ();
105
136
} else {
106
137
nextJobInstIP = jobNodeStorage .getJobNodeDataDirectly ("next-job-instance-ip" );
107
- if (StringUtils .isBlank (nextJobInstIP )) { // if there is no next job instance ip
138
+ if (StringUtils .isBlank (nextJobInstIP )) {
108
139
shardingService .shardingIfNecessary ();
109
140
shardingItems = shardingService .getLocalShardingItems ();
110
- } else { // when next job instance is specified under normal case
141
+ } else {
111
142
JobInstance jobInst = JobRegistry .getInstance ().getJobInstance (jobConfig .getJobName ());
112
143
shardingItems = nextJobInstIP .equals (jobInst .getServerIp ()) ? Collections .singletonList (0 ) : new ArrayList <>();
113
144
}
114
145
}
115
146
if (log .isDebugEnabled ()) {
116
147
log .debug ("job name: {}, sharding items: {}, nextJobInstIP: {}, sharding total count: {}, isFailover: {}" ,
117
- jobConfig .getJobName (), shardingItems , nextJobInstIP , jobConfig .getShardingTotalCount (), isFailover );
148
+ jobConfig .getJobName (), shardingItems , nextJobInstIP , jobConfig .getShardingTotalCount (), isFailover );
118
149
}
119
-
150
+
120
151
if (isFailover ) {
121
152
shardingItems .removeAll (failoverService .getLocalTakeOffItems ());
122
153
}
123
154
shardingItems .removeAll (executionService .getDisabledItems (shardingItems ));
124
155
return executionContextService .getJobShardingContext (shardingItems );
125
156
}
126
-
157
+
127
158
}
0 commit comments