Skip to content

postgresql 15 CDC to selectdb enterprise help #10624

@webtang2017

Description

@webtang2017

I currently have a PostgreSQL 15 master-slave cluster, and I want to use the CDC method to synchronize data to SelectDB Enterprise. The official documentation only mentions SelectDB Cloud as a sink. I wonder if this (SelectDB Cloud sink configuration) can be used? My current configuration file is as follows:

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000 
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}
source {
    Postgres-CDC {
    #plugin_output = "customers_Postgre_cdc"
    username = "postgres"
    password = "webtangAa!1"
    database-names = ["test"]
    schema-names = ["public"]
    table-names = ["test.public.test2"]
    startup.mode = "initial"
    slot.name = "seatunnel_win_1"
    url = "jdbc:postgresql://localhost:5432/test?loggerLevel=OFF"
    }
}

sink {
    SelectDBCloud {
    load-url = "192.168.137.11:8030"
    jdbc-url = "192.168.137.11:9030"
    cluster-name = "buzhidao"
    table.identifier = "stshdb.test2"
    username = "root"
    password = "webtangAa@2"
    selectdb.config {
        file.type = "json"
    }
      }
}

It keeps reporting during startup:
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:266)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
... 5 more
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:220)
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:186)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:84)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
... 6 more
Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel_win_1" 已经存在
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2875)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2560)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:429)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:526)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:436)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:358)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:343)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:319)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:314)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394)
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:212)
... 11 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:258)
    ... 2 more

I didn't add the replication slot name, and now I don't know what caused it.
Please help and advise me. Thank you!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions