5
5
import luigi .task
6
6
from luigi import configuration
7
7
8
- from edx .analytics .tasks .util .url import get_target_class_from_url , url_path_join
8
+ from edx .analytics .tasks .util .url import get_target_class_from_url , get_target_from_url , url_path_join
9
9
10
10
CONFIG_SECTION = 'manifest'
11
11
@@ -27,6 +27,13 @@ def convert_to_manifest_input_if_necessary(manifest_id, targets):
27
27
return targets
28
28
29
29
30
+ def get_manifest_file_path (manifest_id ):
31
+ # Construct the manifest file URL from the manifest_id and the configuration
32
+ base_url = configuration .get_config ().get (CONFIG_SECTION , 'path' )
33
+ manifest_file_path = url_path_join (base_url , manifest_id + '.manifest' )
34
+ return manifest_file_path
35
+
36
+
30
37
def create_manifest_target (manifest_id , targets ):
31
38
# If we are running locally, we need our manifest file to be a local file target, however, if we are running on
32
39
# a real Hadoop cluster, it has to be an HDFS file so that the input format can read it. Luigi makes it a little
@@ -35,8 +42,7 @@ def create_manifest_target(manifest_id, targets):
35
42
# base class at runtime based on the URL of the manifest file.
36
43
37
44
# Construct the manifest file URL from the manifest_id and the configuration
38
- base_url = configuration .get_config ().get (CONFIG_SECTION , 'path' )
39
- manifest_file_path = url_path_join (base_url , manifest_id + '.manifest' )
45
+ manifest_file_path = get_manifest_file_path (manifest_id )
40
46
41
47
# Figure out the type of target that should be used to write/read the file.
42
48
manifest_file_target_class , init_args , init_kwargs = get_target_class_from_url (manifest_file_path )
@@ -49,6 +55,16 @@ class ManifestInputTarget(ManifestInputTargetMixin, manifest_file_target_class):
49
55
return ManifestInputTarget .from_existing_targets (targets , * init_args , ** init_kwargs )
50
56
51
57
58
+ def remove_manifest_target_if_exists (manifest_id ):
59
+ """Given an id and configuration, construct a target that can check and remove a manifest file."""
60
+ manifest_file_path = get_manifest_file_path (manifest_id )
61
+ # we don't need the mixin in order to check for existence or to remove the manifest file.
62
+ manifest_target = get_target_from_url (manifest_file_path )
63
+ if manifest_target .exists ():
64
+ log .info ('Removing existing manifest found at %s' , manifest_target .path )
65
+ manifest_target .remove ()
66
+
67
+
52
68
class ManifestInputTargetMixin (object ):
53
69
54
70
def __init__ (self , * args , ** kwargs ):
@@ -66,7 +82,7 @@ def __init__(self, *args, **kwargs):
66
82
def from_existing_targets (cls , other_targets , * init_args , ** init_kwargs ):
67
83
manifest_target = cls (* init_args , ** init_kwargs )
68
84
if not manifest_target .exists ():
69
- log .debug ('Writing manifest file %s' , manifest_target .path )
85
+ log .info ('Writing manifest file %s' , manifest_target .path )
70
86
with manifest_target .open ('w' ) as manifest_file :
71
87
for target in other_targets :
72
88
manifest_file .write (target .path )
0 commit comments