Skip to content

Commit

Permalink
feat: add default serverless
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Nov 23, 2023
1 parent 86b43b6 commit ff7ccd7
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.5.2-SNAPSHOT</s3stream.version>
<s3stream.version>0.5.3-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
7 changes: 6 additions & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.5.2-SNAPSHOT</version>
<version>0.5.3-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down Expand Up @@ -99,6 +99,11 @@
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.failover;

import com.automq.stream.utils.CommandResult;
import com.automq.stream.utils.CommandUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class DefaultServerless implements Serverless {
private static final String SERVERLESS_CMD = "/opt/automq/scripts/amq-serverless";

@Override
public String attach(String volumeId, int nodeId) throws ExecutionException {
String[] cmd = new String[]{SERVERLESS_CMD, "volume", "attach", "-v", volumeId, "-n", Integer.toString(nodeId)};
CommandResult result = CommandUtils.run();
check(cmd, result);
return jsonParse(result.stdout(), AttachResult.class).getDeviceName();
}

@Override
public void delete(String volumeId) throws ExecutionException {
String[] cmd = new String[]{SERVERLESS_CMD, "volume", "delete", "-v", volumeId};
CommandResult result = CommandUtils.run();
check(cmd, result);
}

@Override
public void fence(String volumeId) throws ExecutionException {
String[] cmd = new String[]{SERVERLESS_CMD, "volume", "fence", "-v", volumeId};
CommandResult result = CommandUtils.run();
check(cmd, result);
}

@Override
public List<FailedNode> scan() throws ExecutionException {
String[] cmd = new String[]{SERVERLESS_CMD, "volume", "query"};
CommandResult result = CommandUtils.run();
check(cmd, result);
QueryFailedNode[] nodes = jsonParse(result.stdout(), QueryFailedNode[].class);
return Arrays.stream(nodes).map(n -> {
FailedNode failedNode = new FailedNode();
failedNode.setNodeId(Integer.parseInt(n.getFirstBindNodeId()));
failedNode.setVolumeId(n.getVolumeId());
return failedNode;
}).collect(Collectors.toList());
}

private static void check(String[] cmd, CommandResult rst) throws ExecutionException {
throw new ExecutionException("Run " + Arrays.toString(cmd) + ", code:" + rst.code() + " failed: " + rst.stderr(), null);
}

private static <T> T jsonParse(String raw, Class<T> clazz) throws ExecutionException {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(raw, clazz);
} catch (JsonProcessingException e) {
throw new ExecutionException(e);
}
}

static class AttachResult {
private String deviceName;

public String getDeviceName() {
return deviceName;
}

public void setDeviceName(String deviceName) {
this.deviceName = deviceName;
}
}

static class QueryFailedNode {
private String firstBindNodeId;
private String volumeId;

public String getFirstBindNodeId() {
return firstBindNodeId;
}

public void setFirstBindNodeId(String firstBindNodeId) {
this.firstBindNodeId = firstBindNodeId;
}

public String getVolumeId() {
return volumeId;
}

public void setVolumeId(String volumeId) {
this.volumeId = volumeId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,39 @@
package com.automq.stream.s3.failover;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface Serverless {

/**
* Attach volume to the target node.
*
* @param volumeId volume id
* @param nodeId target node id
* @param nodeId target node id
* @return attached device name
*/
String attach(String volumeId, String nodeId);
String attach(String volumeId, int nodeId) throws ExecutionException;

/**
* Delete the volume
*
* @param volumeId volume id
*/
void delete(String volumeId);
void delete(String volumeId) throws ExecutionException;

/**
* Fence the first attached node access to the volume
*
* @param volumeId volume id
*/
void fence(String volumeId);
void fence(String volumeId) throws ExecutionException;

/**
* Scan failed node
*
* @return {@link FailedNode} list
*/
List<FailedNode> scan();
List<FailedNode> scan() throws ExecutionException;

class FailedNode {
private int nodeId;
Expand Down

0 comments on commit ff7ccd7

Please sign in to comment.