You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Apr 4, 2019. It is now read-only.
#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).
Command to start--
./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties
Error-
Caused by: java.io.IOException: / doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374)
at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:174)
#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).
Command to start--
./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties
Connector Propeties- connector1.properties
name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=com.qubole.streamx.SourceFormat
tasks.max=1
topics=test
topics.dir=test
logs.dir=logs
flush.size=3
s3.url=s3://platform.com/data/rawdata
hadoop.conf.dir=/app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/hadoop-conf
partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner
Error-
Caused by: java.io.IOException: / doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374)
at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:174)
Worker Configuration-
connect-standalone.properties
bootstrap.servers=localhost:9092
Kafka
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/kafkadata/kafka/connect.offsets
offset.flush.interval.ms=10000
hdfs-site.xml
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3.S3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>secret</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>secret</value> </property> </configuration>
The text was updated successfully, but these errors were encountered: