-
Notifications
You must be signed in to change notification settings - Fork 101
CMR-10600: Split cluster feature #2349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 43 commits
88fcce6
ca785c7
5d62130
601191d
c391ac8
62d0a13
759a710
2553fd5
cc7ccee
5226fef
cb0a5cb
673c3c5
3c96a95
12b5474
404025d
b728aa5
94e500f
f881cc9
07ac650
dd1a571
22baed1
c4e83c9
50d1e6b
73bd39f
4a13a91
1dd3809
6185eb9
ec9f062
5cf21aa
752cc41
870bd1e
8d7c1ac
1b57707
c1d4473
3f53bd2
7c1619e
89e729c
2c46b5e
78d6c40
1658e1b
f07e5dd
753c9b3
99a08db
384e7bc
fd3b583
a8ae351
cb9544d
25132aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| [cmr.common.concepts :as cc] | ||
| [cmr.common.log :refer (info warn error)] | ||
| [cmr.common.util :as util] | ||
| [cmr.elastic-utils.config :as es-config] | ||
| [cmr.elastic-utils.es-helper :as es-helper] | ||
| [cmr.indexer.indexer-util :as indexer-util] | ||
| [cmr.indexer.data.index-set :as index-set] | ||
|
|
@@ -64,19 +65,19 @@ | |
| (db/get-concept (helper/get-metadata-db-db (:system context)) :collection provider collection-id)) | ||
|
|
||
| (defn migrate-index | ||
| "Copy the contents of one index to another." | ||
| [system source-index target-index] | ||
| (info (format "Migrating from index [%s] to index [%s]" source-index target-index)) | ||
| "Copy the contents of one index to another. The target index is assumed to be in the same elastic cluster as the source index." | ||
| [system source-index target-index elastic-name] | ||
| (info (format "Migrating from index [%s] to index [%s] in es cluster [%s]" source-index target-index elastic-name)) | ||
| (let [indexer-context {:system (helper/get-indexer system)} | ||
| conn (indexer-util/context->conn indexer-context)] | ||
| conn (indexer-util/context->conn indexer-context elastic-name)] | ||
| (try | ||
| (let [result (es-helper/migrate-index conn source-index target-index)] | ||
| (when (:error result) | ||
| (throw (ex-info "Migration failed" {:source source-index :target target-index :error result})))) | ||
| (index-set-service/update-resharding-status indexer-context index-set/index-set-id source-index "COMPLETE") | ||
| (index-set-service/update-resharding-status indexer-context index-set/index-set-id source-index "COMPLETE" elastic-name) | ||
| (catch Throwable e | ||
| (error e (format "Migration from [%s] to [%s] failed: %s" source-index target-index (.getMessage e))) | ||
| (index-set-service/update-resharding-status indexer-context index-set/index-set-id source-index "FAILED") | ||
| (index-set-service/update-resharding-status indexer-context index-set/index-set-id source-index "FAILED" elastic-name) | ||
| (throw e))))) | ||
|
|
||
| (defn index-granules-for-collection | ||
|
|
@@ -92,6 +93,7 @@ | |
| concept-batches (db/find-concepts-in-batches db provider params (:db-batch-size system) start-index) | ||
| num-granules (index/bulk-index {:system (helper/get-indexer system)} | ||
| concept-batches | ||
| es-config/gran-elastic-name | ||
| {:target-index-key target-index-key})] | ||
| (info "Indexed" num-granules "granule(s) for provider" provider-id "collection" collection-id) | ||
| (when completion-message | ||
|
|
@@ -113,7 +115,10 @@ | |
| params {:concept-type :granule | ||
| :provider-id provider-id} | ||
| concept-batches (db/find-concepts-in-batches db provider params (:db-batch-size system) start-index) | ||
| num-granules (index/bulk-index {:system (helper/get-indexer system)} concept-batches {})] | ||
| num-granules (index/bulk-index {:system (helper/get-indexer system)} | ||
| concept-batches | ||
| es-config/gran-elastic-name | ||
| {})] | ||
| (info "Indexed" num-granules "granule(s) for provider" provider-id) | ||
| num-granules)) | ||
|
|
||
|
|
@@ -125,7 +130,10 @@ | |
| params {:concept-type :collection | ||
| :provider-id provider-id} | ||
| concept-batches (db/find-concepts-in-batches db provider params (:db-batch-size system)) | ||
| num-collections (index/bulk-index {:system (helper/get-indexer system)} concept-batches {})] | ||
| num-collections (index/bulk-index {:system (helper/get-indexer system)} | ||
| concept-batches | ||
| es-config/elastic-name | ||
| {})] | ||
| (info "Indexed" num-collections "collection(s) for provider" provider-id) | ||
| num-collections)) | ||
|
|
||
|
|
@@ -145,10 +153,10 @@ | |
|
|
||
| (defn- bulk-index-concept-batches | ||
| "Bulk index the given concept batches in both regular index and all revisions index." | ||
| [system concept-batches] | ||
| [system concept-batches es-cluster-name] | ||
| (let [indexer-context {:system (helper/get-indexer system)}] | ||
| (index/bulk-index indexer-context concept-batches {:all-revisions-index? true}) | ||
| (index/bulk-index indexer-context concept-batches {}))) | ||
| (index/bulk-index indexer-context concept-batches es-cluster-name {:all-revisions-index? true}) | ||
| (index/bulk-index indexer-context concept-batches es-cluster-name {}))) | ||
|
|
||
| (defn- index-concepts-by-provider | ||
| "Bulk index concepts for the given provider and concept-type." | ||
|
|
@@ -165,7 +173,10 @@ | |
| db provider | ||
| params | ||
| (:db-batch-size system)) | ||
| num-concepts (bulk-index-concept-batches system concept-batches) | ||
| es-cluster-name (if (= concept-type :granule) | ||
| es-config/gran-elastic-name | ||
| es-config/elastic-name) | ||
| num-concepts (bulk-index-concept-batches system concept-batches es-cluster-name) | ||
| msg (format "Indexing of %s %s revisions for provider %s completed." | ||
| num-concepts | ||
| (name concept-type) | ||
|
|
@@ -190,14 +201,14 @@ | |
| (defn- index-access-control-concepts | ||
| "Bulk index ACLs or access groups" | ||
| [system concept-batches] | ||
| (info "Indexing concepts") | ||
| (ac-bulk-index/bulk-index-with-revision-date {:system (helper/get-indexer system)} concept-batches)) | ||
| (info "Indexing access control concepts") | ||
| (ac-bulk-index/bulk-index-with-revision-date {:system (helper/get-indexer system)} concept-batches es-config/elastic-name)) | ||
|
|
||
| (defn- index-concepts | ||
| "Bulk index the given concepts using the indexer-app" | ||
| [system concept-batches] | ||
| [system concept-batches es-cluster-name] | ||
| (info "Indexing concepts") | ||
| (index/bulk-index-with-revision-date {:system (helper/get-indexer system)} concept-batches)) | ||
| (index/bulk-index-with-revision-date {:system (helper/get-indexer system)} concept-batches es-cluster-name)) | ||
|
|
||
| (defn- fetch-and-index-new-concepts | ||
| "Get batches of concepts for a given provider/concept type that have a revision-date | ||
|
|
@@ -212,10 +223,13 @@ | |
| (dissoc params :provider-id) | ||
| params) | ||
| concept-batches (db/find-concepts-in-batches | ||
| db provider params (:db-batch-size system)) | ||
| db provider params (:db-batch-size system)) | ||
| es-cluster-name (if (= concept-type :granule) | ||
| es-config/gran-elastic-name | ||
| es-config/elastic-name) | ||
| {:keys [max-revision-date num-indexed]} (if (contains? #{:acl :access-group} concept-type) | ||
| (index-access-control-concepts system concept-batches) | ||
| (index-concepts system concept-batches))] | ||
| (index-access-control-concepts system concept-batches) | ||
| (index-concepts system concept-batches es-cluster-name))] | ||
|
|
||
| (info (format (str "Indexed %d %s(s) for provider %s with revision-date later than %s and max " | ||
| "revision date was %s.") | ||
|
|
@@ -241,7 +255,7 @@ | |
| (:db-batch-size system) | ||
| start-index)]] | ||
| (:num-indexed (if (= concept-type :tag) | ||
| (index-concepts system concept-batches) | ||
| (index-concepts system concept-batches es-config/elastic-name) | ||
| (index-access-control-concepts system concept-batches)))))] | ||
| (info "Indexed" total "system concepts.") | ||
| total)) | ||
|
|
@@ -261,12 +275,15 @@ | |
| {:concept-type concept-type :concept-id batch} | ||
| (:db-batch-size system))] | ||
| concept-batch) | ||
| total (index/bulk-index {:system (helper/get-indexer system)} concept-batches)] | ||
| es-cluster-name (if (= :granule concept-type) | ||
| es-config/gran-elastic-name | ||
| es-config/elastic-name) | ||
| total (index/bulk-index {:system (helper/get-indexer system)} concept-batches es-cluster-name)] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume the first parameter is a context, and it is now repeated (and I see it below as well). Can you make this a local function in case we have to add something to all of them latter we can do it once?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure if this warrants a separate func since it just calls another func to return a var |
||
|
|
||
| ;; for concept types that have all revisions index, also index the all revisions index | ||
| (when-not (#{:tag :granule} concept-type) | ||
| (index/bulk-index | ||
| {:system (helper/get-indexer system)} concept-batches {:all-revisions-index? true})) | ||
| {:system (helper/get-indexer system)} concept-batches es-config/elastic-name {:all-revisions-index? true})) | ||
|
|
||
| (info "Indexed " total " concepts.") | ||
| total)) | ||
|
|
@@ -279,7 +296,7 @@ | |
| [system _ _ concept-ids] | ||
| (let [query {:terms {:concept-id concept-ids}} | ||
| indexer-context {:system (helper/get-indexer system)}] | ||
| (es-helper/delete-by-query (indexer-util/context->conn indexer-context) "_all" "granule" query))) | ||
| (es-helper/delete-by-query (indexer-util/context->conn indexer-context es-config/gran-elastic-name) "_all" "granule" query))) | ||
|
|
||
| (defmethod delete-concepts-by-id :default | ||
| [system provider-id concept-type concept-ids] | ||
|
|
@@ -295,7 +312,10 @@ | |
| {:concept-type concept-type :concept-id batch} | ||
| (:db-batch-size system))] | ||
| (map #(assoc % :deleted true) concept-batch)) | ||
| total (index/bulk-index {:system (helper/get-indexer system)} concept-batches)] | ||
| es-cluster-name (if (= concept-type :granule) | ||
| es-config/gran-elastic-name | ||
| es-config/elastic-name) | ||
| total (index/bulk-index {:system (helper/get-indexer system)} concept-batches es-cluster-name)] | ||
| (info "Deleted " total " concepts") | ||
| total)) | ||
|
|
||
|
|
@@ -409,7 +429,7 @@ | |
| (let [channel (:migrate-index-channel core-async-dispatcher)] | ||
| (async/thread (while true | ||
| (try ; log errors but keep the thread alive) | ||
| (let [{:keys [source-index target-index]} (<!! channel)] | ||
| (migrate-index system source-index target-index)) | ||
| (let [{:keys [source-index target-index elastic-name]} (<!! channel)] | ||
| (migrate-index system source-index target-index elastic-name)) | ||
| (catch Throwable e | ||
| (error e (.getMessage e))))))))) | ||
Uh oh!
There was an error while loading. Please reload this page.