-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PHOENIX-7459 : Bootstrap stream metadata when CDC is enabled on a table #2033
base: master
Are you sure you want to change the base?
Conversation
...erver/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
Show resolved
Hide resolved
phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
Outdated
Show resolved
Hide resolved
phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
Outdated
Show resolved
Hide resolved
...erver/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, pending clean build results
phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
Outdated
Show resolved
Hide resolved
.setMessage(ioe.getMessage()) | ||
.setSchemaName(SYSTEM_SCHEMA_NAME) | ||
.setTableName(SYSTEM_TASK_TABLE).build().buildException(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do this before creating the CDC object itself to be technically correct and avoid the window when the CDC is active but the stream is not? Also, how do we plan to synchronize this with the ongoing splits and merges?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@haridsv PHOENIX-7460 for keeping region metadata in sync.
When will the DROP CDC be hooked up to change the stream status? |
@haridsv Since DROP CDC needs more changes like not dropping the index but only stopping the writes to the index, I was thinking we can do that part as a separate JIRA. I can create another subtask under PHOENIX-7456, @virajjasani wdyt? |
} catch (Throwable t) { | ||
LOGGER.error("Exception while bootstrapping CDC Stream Partition Metadata for " | ||
+ taskRecord.getTableName() + " and timestamp " + timestamp.toString(), t); | ||
return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Consider a RETRY strategy here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@haridsv @virajjasani Not sure under what kind of failures, we should be retrying the task here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is safe to retry on all SQLException
scenarios, which can only happen when there is an issue with the system table availability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
STREAM_NAME + " VARCHAR NOT NULL," + | ||
// Non-PK columns | ||
STREAM_NAME + " VARCHAR,\n" + | ||
STREAM_STATUS + " VARCHAR,\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@palashc btw the query pattern is going to be:
Step 1: Get the active table stream from the metadata.
SELECT STREAM_NAME FROM SYSTEM.CDC_STREAM_STATUS WHERE TABLE_NAME = <table-name> AND STREAM_STATUS = 'ACTIVE'
How would keeping STREAM_NAME help here? I missed this earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@virajjasani I changed it because the update pattern is
- UPSERT (table_name, stream_name, ENABLING)
- UPSERT (table_name, stream_name, ENABLED)
For querying, since there should only be one stream per table, the query should still be efficient?
SELECT STREAM_NAME FROM SYSTEM.CDC_STREAM_STATUS WHERE TABLE_NAME = <table-name> AND STREAM_STATUS = 'ENABLED'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also for the fact that you can can have multiple streams in the same state, see my latest comment on the corresponding thread in the design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since DROP CDC needs more changes like not dropping the index but only stopping the writes to the index, I was thinking we can do that part as a separate JIRA.
I feel we should include the DROP CDC change to complete the stream status change such that the new test to ensure only one active CDC can exist at any time can be extended to cover the scenario of creating a new CDC after dropping the existing one.
The follow up PR can then focus solely on reusing a single index instead of creating one per CDC. WDYT?
statement.getDataTable().getTableName()); | ||
|
||
// for now, only track stream partition metadata for tables, TODO: updatable views | ||
if (PTableType.TABLE.equals(dataTable.getType())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: with enums you can do equality check
if (PTableType.TABLE.equals(dataTable.getType())) { | |
if (PTableType.TABLE == dataTable.getType()) { |
DROP CDC can be separate sub task IMO. Keeping individual sub tasks make it much simpler to follow the change history. |
Though if the PR changes are manageable as single sub task, we can include them here as well. Not a problem. It's upto you @palashc. |
.setTaskType(PTable.TaskType.CDC_STREAM_PARTITION) | ||
.setTableName(tableName) //give full table name | ||
.setSchemaName(streamName) // use schemaName to pass streamName | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we set the status to CREATED?
.setStartTs(taskRecord.getTimeStamp()) | ||
.setEndTs(null) | ||
.build()); | ||
LOGGER.error("Marking task as RETRY. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a warning, since we are not giving up yet?
try { | ||
pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); | ||
List<HRegionLocation> tableRegions = pconn.getQueryServices().getAllTableRegions( | ||
tableName.getBytes(), getTableRegionsTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a timeout due to the table having a large number of regions, it may continue to timeout even in retries. How about we query for the PARTITION_END_KEY of the last partition and use getTableRegions
to start from that key? We could load any existing partition records for this stream first (which will be empty the first time so we will default to HConstants.EMPTY_START_ROW
), sort them and get the highest end key. The end key for getTableRegions
would always be HConstants.EMPTY_START_ROW
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use existing method that is already used by getAllTableRegions() API.
ps.executeUpdate(); | ||
connection.commit(); | ||
LOGGER.info("Marked stream {} for table {} as DISABLED", streamName, parentTableName); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do this after the CDC drop is successful, may be even after the index drop.
Jira: PHOENIX-7459