I currently have a PostgreSQL 15 master-slave cluster, and I want to use the CDC method to synchronize data to SelectDB Enterprise. The official documentation only mentions SelectDB Cloud as a sink. I wonder if this (SelectDB Cloud sink configuration) can be used? My current configuration file is as follows:
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 60000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
Postgres-CDC {
#plugin_output = "customers_Postgre_cdc"
username = "postgres"
password = "webtangAa!1"
database-names = ["test"]
schema-names = ["public"]
table-names = ["test.public.test2"]
startup.mode = "initial"
slot.name = "seatunnel_win_1"
url = "jdbc:postgresql://localhost:5432/test?loggerLevel=OFF"
}
}
sink {
SelectDBCloud {
load-url = "192.168.137.11:8030"
jdbc-url = "192.168.137.11:9030"
cluster-name = "buzhidao"
table.identifier = "stshdb.test2"
username = "root"
password = "webtangAa@2"
selectdb.config {
file.type = "json"
}
}
}
It keeps reporting during startup:
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:266)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
... 5 more
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:220)
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:186)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:84)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
... 6 more
Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel_win_1" 已经存在
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2875)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2560)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:429)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:526)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:436)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:358)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:343)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:319)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:314)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394)
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:212)
... 11 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:258)
... 2 more
I didn't add the replication slot name, and now I don't know what caused it.
Please help and advise me. Thank you!
I currently have a PostgreSQL 15 master-slave cluster, and I want to use the CDC method to synchronize data to SelectDB Enterprise. The official documentation only mentions SelectDB Cloud as a sink. I wonder if this (SelectDB Cloud sink configuration) can be used? My current configuration file is as follows:
It keeps reporting during startup:
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:266)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
... 5 more
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:220)
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:186)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:84)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
... 6 more
Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel_win_1" 已经存在
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2875)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2560)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:429)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:526)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:436)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:358)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:343)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:319)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:314)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394)
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:212)
... 11 more
I didn't add the replication slot name, and now I don't know what caused it.
Please help and advise me. Thank you!