use a predefined idempotent id before retry#37
Merged
Conversation
```
(base) ➜ sling-cli git:(main) ✗ ./sling run --src-conn LOCAL --tgt-conn PROTON --tgt-object "ge8" --tgt-options '{format: "csv", delimiter: "~", head: true}' --src-stream "file://./re.csv" --mode incremental --update-key _tp_time -d
2024-10-26 19:46:32 DBG opened "file" connection (conn-file-gof)
2024-10-26 19:46:32 DBG Force SLING_PROCESS_BW to false for timeplus database
2024-10-26 19:46:32 DBG Sling version: dev (linux amd64)
2024-10-26 19:46:32 DBG type is file-db
2024-10-26 19:46:32 DBG using: {"columns":null,"mode":"incremental","transforms":null}
2024-10-26 19:46:32 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"fields_per_rec":-1,"compression":"auto","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"max_decimals":11}
2024-10-26 19:46:32 DBG using target options: {"batch_limit":50000,"datetime_format":"auto","delimiter":"~","file_max_rows":0,"format":"csv","max_decimals":11,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-10-26 19:46:32 DBG opened "proton" connection (conn-proton-jHh)
2024-10-26 19:46:32 INF connecting to target database (proton)
2024-10-26 19:46:32 INF getting checkpoint value
2024-10-26 19:46:32 DBG select max(`_tp_time`) as max_val from table(`default`.`ge8`)
2024-10-26 19:46:32 INF reading from source file system (file)
2024-10-26 19:46:32 DBG file stream using incremental_val=time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) and update_key=_tp_time
2024-10-26 19:46:32 DBG opened "file" connection (conn-file-ggp)
2024-10-26 19:46:32 DBG reading single datastream from file://./re.csv [format=csv]
2024-10-26 19:46:32 DBG merging csv readers of 1 files [concurrency=3] from file://./re.csv
2024-10-26 19:46:32 DBG processing reader from file://./re.csv
2024-10-26 19:46:32 DBG delimiter auto-detected: "~"
2024-10-26 19:46:32 DBG casting column 'id' as 'bigint'
2024-10-26 19:46:32 DBG casting column '_tp_time' as 'datetime'
2024-10-26 19:46:32 INF writing to target database [mode: incremental]
2024-10-26 19:46:32 INF streaming data (direct insert)
2024-10-26 19:46:32 DBG use `default`
2024-10-26 19:46:32 WRN Batch 1 failed, retrying in 1.201935408s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1729997192450937240_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.907308681987399 of disk default exceeds the max_disk_util 0.9
2024-10-26 19:46:33 WRN Batch 1 failed, retrying in 1.88363827s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1729997192450937240_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9073087271520056 of disk default exceeds the max_disk_util 0.9
2024-10-26 19:46:35 WRN Batch 1 failed, retrying in 2.445018854s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1729997192450937240_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9073044693613656 of disk default exceeds the max_disk_util 0.9
2024-10-26 19:46:38 WRN Batch 1 failed, retrying in 10.93827517s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1729997192450937240_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9073044816789856 of disk default exceeds the max_disk_util 0.9
^C
interrupting...
2024-10-26 19:46:43 DBG opened "proton" connection (conn-proton-R6J)
2024-10-26 19:46:43 DBG closed "proton" connection (conn-proton-R6J)
2024-10-26 19:46:43 DBG closed "proton" connection (conn-proton-jHh)
2024-10-26 19:46:48 WRN Batch 1 failed, retrying in 13.125867356s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1729997192450937240_1_batch_`default`.`ge8`' values ($1, $2)
context canceled
2024-10-26 19:46:48 INF execution failed
fatal:
--- proc.go:271 main ---
--- sling_cli.go:473 main ---
--- sling_cli.go:509 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:225 processRun ---
~ failure running task (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:396 runTask ---
--- task_run.go:151 Execute ---
--- proc.go:271 main ---
--- sling_cli.go:473 main ---
--- sling_cli.go:509 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:225 processRun ---
--- sling_run.go:396 runTask ---
--- task_run.go:137 Execute ---
Execution interrupted
```
```
(base) ➜ sling-cli git:(bugfix/correct-retry-with-idempotent-id) ✗ ./sling run --src-conn LOCAL --tgt-conn PROTON --tgt-object "ge8" --tgt-options '{format: "csv", delimiter: "~", head: true}' --src-stream "file://./re.csv" --mode incremental --update-key _tp_time -d
2024-10-26 21:41:46 DBG opened "file" connection (conn-file-NVP)
2024-10-26 21:41:46 DBG Force SLING_PROCESS_BW to false for timeplus database
2024-10-26 21:41:46 DBG Sling version: dev (linux amd64)
2024-10-26 21:41:46 DBG type is file-db
2024-10-26 21:41:46 DBG using: {"columns":null,"mode":"incremental","transforms":null}
2024-10-26 21:41:46 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"fields_per_rec":-1,"compression":"auto","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"max_decimals":11}
2024-10-26 21:41:46 DBG using target options: {"batch_limit":50000,"datetime_format":"auto","delimiter":"~","file_max_rows":0,"format":"csv","max_decimals":11,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-10-26 21:41:46 DBG opened "proton" connection (conn-proton-wTy)
2024-10-26 21:41:46 INF connecting to target database (proton)
2024-10-26 21:41:46 INF getting checkpoint value
2024-10-26 21:41:46 DBG select max(`_tp_time`) as max_val from table(`default`.`ge8`)
2024-10-26 21:41:46 INF reading from source file system (file)
2024-10-26 21:41:46 DBG file stream using incremental_val=time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) and update_key=_tp_time
2024-10-26 21:41:46 DBG opened "file" connection (conn-file-spw)
2024-10-26 21:41:46 DBG reading single datastream from file://./re.csv [format=csv]
2024-10-26 21:41:46 DBG merging csv readers of 1 files [concurrency=3] from file://./re.csv
2024-10-26 21:41:46 DBG processing reader from file://./re.csv
2024-10-26 21:41:46 DBG delimiter auto-detected: "~"
2024-10-26 21:41:46 DBG casting column 'id' as 'bigint'
2024-10-26 21:41:46 DBG casting column '_tp_time' as 'datetime'
2024-10-26 21:41:46 INF writing to target database [mode: incremental]
2024-10-26 21:41:46 INF streaming data (direct insert)
2024-10-26 21:41:46 DBG use `default`
2024-10-26 21:41:46 WRN Batch 1 failed, retrying in 1.23239622s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9072359916060183 of disk default exceeds the max_disk_util 0.9
2024-10-26 21:41:48 WRN Batch 1 failed, retrying in 1.801986813s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9072360203471316 of disk default exceeds the max_disk_util 0.9
2024-10-26 21:41:49 WRN Batch 1 failed, retrying in 5.960235655s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9072384263889012 of disk default exceeds the max_disk_util 0.9
2024-10-26 21:41:55 WRN Batch 1 failed, retrying in 4.899978337s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9072338811871282 of disk default exceeds the max_disk_util 0.9
2024-10-26 21:42:00 WRN Batch 1 failed, retrying in 7.414246406s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
code: 2529, message: The utilization 0.9072407338897109 of disk default exceeds the max_disk_util 0.9
^C
interrupting...
2024-10-26 21:42:05 DBG opened "proton" connection (conn-proton-WQ0)
2024-10-26 21:42:05 DBG closed "proton" connection (conn-proton-WQ0)
2024-10-26 21:42:05 DBG closed "proton" connection (conn-proton-wTy)
2024-10-26 21:42:08 WRN Batch 1 failed, retrying in 5.19163513s: ~ could not prepare statement for table: `default`.`ge8`, statement: insert into `default`.`ge8` (`id`, `_tp_time`) settings idempotent_id='1730004106778068724_1_batch_`default`.`ge8`' values ($1, $2)
context canceled
2024-10-26 21:42:10 INF execution failed
fatal:
--- proc.go:271 main ---
--- sling_cli.go:473 main ---
--- sling_cli.go:509 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:225 processRun ---
~ failure running task (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:396 runTask ---
--- task_run.go:151 Execute ---
--- proc.go:271 main ---
--- sling_cli.go:473 main ---
--- sling_cli.go:509 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:225 processRun ---
--- sling_run.go:396 runTask ---
--- task_run.go:137 Execute ---
Execution interrupted
(base) ➜ sling-cli git:(bugfix/correct-retry-with-idempotent-id) ✗ date
Sat 26 Oct 2024 09:43:29 PM PDT
(base) ➜ sling-cli git:(bugfix/correct-retry-with-idempotent-id) ✗
```
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
change:
generate a unique idempotent id before retry, ensure the retry will use a same id.
reason:
for
insert into v(id) values(1) settings idempotent_id='2003'insert into v(id) values(1) settings idempotent_id='2004'those are 2 different insert, for correctly retry, we need use a same id:
insert into v(id) values(1) settings idempotent_id='2003'insert into v(id) values(1) settings idempotent_id='2003'verified in commit msg
version bump:
sling-1.2.20-timeplus => sling-1.2.20-stable-timeplus ? or use sling-1.2.20.1-timeplus ?