Skip to content

Commit

Permalink
[core] Add basic implementation to support REST Catalog (#4553)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 authored Nov 28, 2024
1 parent cdd4061 commit 0fe18e8
Show file tree
Hide file tree
Showing 42 changed files with 2,054 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -54,13 +55,22 @@ public class ThreadPoolUtils {
* is max thread number.
*/
public static ThreadPoolExecutor createCachedThreadPool(int threadNum, String namePrefix) {
return createCachedThreadPool(threadNum, namePrefix, new LinkedBlockingQueue<>());
}

/**
* Create a thread pool with max thread number and define queue. Inactive threads will
* automatically exit.
*/
public static ThreadPoolExecutor createCachedThreadPool(
int threadNum, String namePrefix, BlockingQueue<Runnable> workQueue) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
threadNum,
threadNum,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
workQueue,
newDaemonThreadFactory(namePrefix));
executor.allowCoreThreadTimeOut(true);
return executor;
Expand Down
57 changes: 57 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ under the License.

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
<okhttp.version>4.12.0</okhttp.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -63,6 +64,14 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- REST Catalog dependencies -->

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down Expand Up @@ -204,6 +213,20 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand All @@ -219,6 +242,40 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-paimon</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*</artifact>
<excludes>
<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<includes combine.children="append">
<include>com.squareup.okhttp3:okhttp</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>okhttp3</pattern>
<shadedPattern>org.apache.paimon.shade.okhttp3</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
public class DefaultErrorHandler extends ErrorHandler {
private static final ErrorHandler INSTANCE = new DefaultErrorHandler();

public static ErrorHandler getInstance() {
return INSTANCE;
}

@Override
public void accept(ErrorResponse error) {
int code = error.code();
switch (code) {
case 400:
throw new BadRequestException(
String.format("Malformed request: %s", error.message()));
case 401:
throw new NotAuthorizedException("Not authorized: %s", error.message());
case 403:
throw new ForbiddenException("Forbidden: %s", error.message());
case 405:
case 406:
break;
case 500:
throw new ServiceFailureException("Server error: %s", error.message());
case 501:
throw new UnsupportedOperationException(error.message());
case 503:
throw new ServiceUnavailableException("Service unavailable: %s", error.message());
}

throw new RESTException("Unable to process: %s", error.message());
}
}
26 changes: 26 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/ErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.responses.ErrorResponse;

import java.util.function.Consumer;

/** Error handler for REST client. */
public abstract class ErrorHandler implements Consumer<ErrorResponse> {}
142 changes: 142 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.responses.ErrorResponse;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
import static okhttp3.ConnectionSpec.MODERN_TLS;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

/** HTTP client for REST catalog. */
public class HttpClient implements RESTClient {

private final OkHttpClient okHttpClient;
private final String uri;
private final ObjectMapper mapper;
private final ErrorHandler errorHandler;

private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

public HttpClient(HttpClientOptions httpClientOptions) {
this.uri = httpClientOptions.uri();
this.mapper = httpClientOptions.mapper();
this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = httpClientOptions.errorHandler();
}

@Override
public <T extends RESTResponse> T get(
String path, Class<T> responseType, Map<String, String> headers) {
try {
Request request =
new Request.Builder()
.url(uri + path)
.get()
.headers(Headers.of(headers))
.build();
return exec(request, responseType);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public <T extends RESTResponse> T post(
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers) {
try {
RequestBody requestBody = buildRequestBody(body);
Request request =
new Request.Builder()
.url(uri + path)
.post(requestBody)
.headers(Headers.of(headers))
.build();
return exec(request, responseType);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
okHttpClient.dispatcher().cancelAll();
okHttpClient.connectionPool().evictAll();
}

private <T extends RESTResponse> T exec(Request request, Class<T> responseType) {
try (Response response = okHttpClient.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error =
new ErrorResponse(
responseBodyStr != null ? responseBodyStr : "response body is null",
response.code());
errorHandler.accept(error);
}
if (responseBodyStr == null) {
throw new RESTException("response body is null.");
}
return mapper.readValue(responseBodyStr, responseType);
} catch (Exception e) {
throw new RESTException(e, "rest exception");
}
}

private RequestBody buildRequestBody(RESTRequest body) throws JsonProcessingException {
return RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE);
}

private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) {
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);

OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(new Dispatcher(executorService))
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT));
httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
httpClientOptions.readTimeout().ifPresent(builder::readTimeout);

return builder.build();
}
}
Loading

0 comments on commit 0fe18e8

Please sign in to comment.