Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23441 Sql. Cancellation of script execution #4706

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
21 changes: 21 additions & 0 deletions modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
void executeScript(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @throws SqlException If failed.
*/
void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);
zstan marked this conversation as resolved.
Show resolved Hide resolved

/**
* Executes a multi-statement SQL query.
*
Expand All @@ -389,4 +399,15 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @return Operation future.
* @throws SqlException If failed.
*/
CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public static CompletableFuture<Void> process(
HybridTimestamp clientTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
transactions.updateObservableTimestamp(clientTs);

// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Pass cancellation token to the query processor.
return IgniteSqlImpl.executeScriptCore(
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, arguments, props.toSqlProps()
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, null, arguments, props.toSqlProps()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public void executeScript(String query, @Nullable Object... arguments) {
}
}

/** {@inheritDoc} */
@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
Expand Down Expand Up @@ -335,6 +342,14 @@ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object
return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
}

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

private static void packProperties(
PayloadOutputChannel w,
@Nullable Map<String, Object> statementProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,22 @@ public void executeScript(String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(query, arguments));
}

@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(cancellationToken, query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(cancellationToken, query, arguments));
}

@Override
public <T> T unwrap(Class<T> classToUnwrap) {
return attachmentLock.attached(ignite -> Wrappers.unwrap(ignite.sql(), classToUnwrap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -36,6 +39,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -44,7 +48,10 @@
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.ColumnMetadataImpl;
import org.apache.ignite.internal.sql.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.CursorClosedException;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.ErrorGroups.Sql;
Expand All @@ -65,10 +72,12 @@
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AssertionFailureBuilder;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -929,6 +938,51 @@ public void testQueryTimeout() {
});
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23693")
public void cancelScript() {
IgniteSql sql = igniteSql();

sql("CREATE TABLE test (id INT PRIMARY KEY);");

// DML is used because the cursor will be closed as soon as the first page is ready.
String script =
"INSERT INTO test SELECT x FROM system_range(0, 10000000000);"
+ "SELECT 1;";

CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();

CompletableFuture<Void> scriptFut = IgniteTestUtils.runAsync(() -> executeScript(sql, token, script));

// Wait until FIRST script statement is started to execute.
Awaitility.await().untilAsserted(() -> assertThat(queryProcessor().runningQueries(), greaterThan(1)));

assertThat(scriptFut.isDone(), is(false));

String expectedErrMsg = "The query was cancelled while executing.";

cancelHandle.cancel();

assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
expectedErrMsg,
() -> IgniteTestUtils.await(scriptFut)
);

assertThat(queryProcessor().runningQueries(), is(0));
assertThat(txManager().pending(), is(0));

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
expectedErrMsg,
() -> executeScript(sql, token, "SELECT 1; SELECT 2;")
);
assertThat(queryProcessor().runningQueries(), is(0));
assertThat(txManager().pending(), is(0));
}

@Test
public void testQueryTimeoutIsPropagatedFromTheServer() throws Exception {
Statement stmt = igniteSql().statementBuilder()
Expand Down Expand Up @@ -1041,6 +1095,8 @@ protected ResultProcessor execute(IgniteSql sql, String query, Object... args) {

protected abstract void executeScript(IgniteSql sql, String query, Object... args);

protected abstract void executeScript(IgniteSql sql, CancellationToken token, String query, Object... args);

protected abstract void rollback(Transaction outerTx);

protected abstract void commit(Transaction outerTx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.api;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -55,7 +56,6 @@
/**
* Tests for asynchronous SQL API.
*/
@SuppressWarnings("ThrowableNotThrown")
public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest {
@Test
public void pageSequence() {
Expand Down Expand Up @@ -120,6 +120,13 @@ public void cancelQueryString() throws InterruptedException {

return sql.executeAsync(transaction, token, query);
});

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
cancelHandle.cancel();

expectQueryCancelled(() -> await(sql.executeAsync(null, token, "SELECT 1")));
}

@Test
Expand Down Expand Up @@ -221,6 +228,11 @@ protected void executeScript(IgniteSql sql, String query, Object... args) {
await(sql.executeScriptAsync(query, args));
}

@Override
protected void executeScript(IgniteSql sql, CancellationToken cancellationToken, String query, Object... args) {
await(sql.executeScriptAsync(cancellationToken, query, args));
}

@Override
protected void rollback(Transaction tx) {
await(tx.rollbackAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Tests for asynchronous client SQL API.
Expand All @@ -41,18 +42,27 @@ public void stopClient() {
client.close();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelQueryString() throws InterruptedException {
super.cancelQueryString();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelStatement() throws InterruptedException {
super.cancelStatement();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelScript() {
super.cancelScript();
}

@Override
protected IgniteSql igniteSql() {
return client.sql();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Tests for synchronous client SQL API.
Expand All @@ -41,18 +42,27 @@ public void stopClient() {
client.close();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelQueryString() throws InterruptedException {
super.cancelQueryString();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelStatement() throws InterruptedException {
super.cancelStatement();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelScript() {
super.cancelScript();
}

@Override
protected IgniteSql igniteSql() {
return client.sql();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.api;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -74,8 +75,19 @@ public void cancelQueryString() throws InterruptedException {
.build();

Transaction transaction = igniteTx().begin();

transactions.add(transaction);

return sql.execute(transaction, token, statement);
});

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
cancelHandle.cancel();

//noinspection resource
expectQueryCancelled(() -> sql.execute(null, token, "SELECT 1"));
}

@Test
Expand All @@ -91,6 +103,9 @@ public void cancelStatement() throws InterruptedException {
// with transaction
executeAndCancel((token) -> {
Transaction transaction = igniteTx().begin();

transactions.add(transaction);

return sql.execute(transaction, token, query);
});
}
Expand Down Expand Up @@ -160,6 +175,11 @@ protected void executeScript(IgniteSql sql, String query, Object... args) {
sql.executeScript(query, args);
}

@Override
protected void executeScript(IgniteSql sql, CancellationToken cancellationToken, String query, Object... args) {
sql.executeScript(cancellationToken, query, args);
}

@Override
protected void rollback(Transaction tx) {
tx.rollback();
Expand Down
Loading