flink 问题

Last updated on January 18, 2025 am

🧙 Questions

☄️ Ideas

Unable to create a source for reading table

The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.from_table'.\n\nTable options are:\n\n'connector'='jdbc'\n'driver'='com.mysql.cj.jdbc.Driver'\n'password'='ispong123'\n'table-name'='users'\n'url'='jdbc:mysql://isxcode:30306/ispong_db'\n'username'='root'
解决方案

无法读取表,驱动找不到,需要添加mysql驱动

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz
tar -vzxf mysql-connector-java-8.0.22.tar.gz 
mv mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar /opt/flink/lib/

cd /opt/flink/lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.0/flink-connector-jdbc_2.12-1.14.0.jar

check with the configuration ‘classloader.check-leaked-classloader’

Exception in thread "Thread-3" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
        at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2647)
        at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:2905)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2864)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2838)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2715)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1186)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1774)
        at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
        at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
        at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
解决方案
sudo vim /opt/flink/conf/flink-conf.yaml

直接新增,默认没有

classloader.check-leaked-classloader: false

Caused by: java.lang.ClassCastException

Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317)
	... 6 more
解决方案
sudo vim /opt/flink/conf/flink-conf.yaml
classloader.resolve-order: parent-first

Please check the ‘yarn.scheduler.maximum-allocation-mb’

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:426)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:606)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:860)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:860)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The cluster does not have the requested resources for the TaskManagers available!
Maximum Memory: 4707 Requested: 5120MB. Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values

        at org.apache.flink.yarn.YarnClusterDescriptor.validateClusterResources(YarnClusterDescriptor.java:670)
        at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:589)
        at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:419)
        ... 7 more
解决方案
yarn.nodemanager.resource.memory-mb: 服务器资源可申请资源
yarn.scheduler.maximum-allocation-mb: 每个作业可申请最大资源 

flink的配置中,需要申请资源大于yarn.scheduler.maximum-allocation-mb,则会报错。
所以将yarn.nodemanager.resource.memory-mb调大,大于5120MB即可

Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration

java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:38) ~[flink-shaded-hadoop-2-2.7.5-10.0.jar:2.7.5-10.0]
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:36) ~[flink-shaded-hadoop-2-2.7.5-10.0.jar:2.7.5-10.0]
        at org.apache.hadoop.security.UserGroupInformation$UgiMetrics.create(UserGroupInformation.java:121) ~[flink-shaded-hadoop-2-2.7.5-10.0.jar:2.7.5-10.0]
        at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:237) ~[flink-shaded-hadoop-2-2.7.5-10.0.jar:2.7.5-10.0]
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:63) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
        at org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:91) [flink-dist_2.12-1.12.7.jar:1.12.7]
        at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:58) [flink-dist_2.12-1.12.7.jar:1.12.7]
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:858) [flink-dist_2.12-1.12.7.jar:1.12.7]
Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_342]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_342]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_342]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_342]
        ... 8 more
解决方案
cd /opt/flink/lib
wget https://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar

java.lang.NoClassDefFoundError: org/apache/htrace/Trace

java.lang.NoClassDefFoundError: org/apache/htrace/Trace
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:213)
        at com.sun.proxy.$Proxy24.getClusterNodes(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:266)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy25.getClusterNodes(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:504)
        at org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:44)
        at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:321)
        at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:534)
        at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:418)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:606)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:860)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:860)
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 22 more
解决方案
cd /opt/flink/lib
wget https://repo1.maven.org/maven2/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:385)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:295)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:266)
	at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
	at com.isxcode.acorn.job.SqlJob.main(SqlJob.java:12)
解决方案
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.14.0</version>
</dependency>

####

Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
        at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
        at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
        at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)
        ... 63 more
解决方案
vim /opt/flink/conf/flink-conf.yaml

classloader.resolve-order: parent-first

####

2022-11-08 14:47:32,430 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3289)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3334)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3373)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:233)
        at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:784)
        at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
        at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:475)
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2043)
        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
        at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:773)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:754)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
        at com.isxcode.acorn.job.SqlJob.main(SqlJob.java:37)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
解决方案
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-yarn_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <exclusions>
                <exclusion>
                        <artifactId>flink-shaded-hadoop2</artifactId>
                        <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-common</artifactId>
                </exclusion>
                <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-hdfs</artifactId>
                </exclusion>
                <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-yarn-common</artifactId>
                </exclusion>
                <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-yarn-client</artifactId>
                </exclusion>
                <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-mapreduce-client-core</artifactId>
                </exclusion>
        </exclusions>
</dependency>
中文乱码
vim conf/flink-conf.yml
env.java.opts: "-Dfile.encoding=UTF-8"
please declare primary key for sink table when query contains update/delete record
please declare primary key for sink table when query contains update/delete record
CREATE TABLE sink_t
(
  `associate_amount` decimal(18, 3) ,
  `plate` varchar(50),
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
      'connector' = 'mysql-x',
      'url' = 'jdbc:mysql://192.168.168.116:30131/ispong_db',
      'table-name' = 'sales_mgt_payment_plantest',
      'username' = 'root',
      'password' = '');

flink 问题
https://ispong.isxcode.com/hadoop/flink/flink 问题/
Author
ispong
Posted on
August 9, 2021
Licensed under