Skip to content

Commit

Permalink
完善gateway客户端的抽象,对齐两个抽象层的使用,新增测试用例
Browse files Browse the repository at this point in the history
  • Loading branch information
reatang committed May 7, 2024
1 parent c714ee5 commit e9f8965
Show file tree
Hide file tree
Showing 28 changed files with 756 additions and 177 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
/vendor/
.DS_Store
.DS_Store
.phpunit.result.cache
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ abstract grpc and grpc-gateway
- [x] 添加解析 protobuf Any 参数的工具 UtilAny
- [x] 一元请求重试中间件
- [x] 支持配置化的默认调用行为
- [x] 支持OTLP链路追踪
- [x] 支持OpenTelemetry链路追踪(grpc、gateway都已支持)
- [x] 开发自动生成grpc原生客户端抽象层 [protoc-gen-php-abs-grpc](https://github.com/reatang/protoc-gen-php-abs-grpc)
- [ ] 开发自动生成grpc-gateway抽象层 `很显然,没搞呢`

Expand Down
28 changes: 28 additions & 0 deletions src/Client/GatewayBaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use GuzzleHttp\RequestOptions;
use Psr\Http\Message\ResponseInterface;
use Reatang\GrpcPHPAbstract\Exceptions\GrpcException;
use Reatang\GrpcPHPAbstract\Exceptions\GrpcPhpClientException;
use Reatang\GrpcPHPAbstract\Metadata\GatewayHandle;
use Reatang\GrpcPHPAbstract\Metadata\Metadata;
use Reatang\GrpcPHPAbstract\Middlewares\GatewayMiddleware;
Expand Down Expand Up @@ -42,6 +43,7 @@ public function __construct(string $host, array $middleware = [])
$h->push(Middleware::httpErrors(), 'http_errors');
$h->push(Middleware::prepareBody(), 'prepare_body');
$h->push(GatewayMiddleware::retry(), 'retry');
$h->push(GatewayMiddleware::openTelemetryTrace(), "open_telemetry");

$this->client = new Client([
'base_uri' => $this->host(),
Expand Down Expand Up @@ -92,6 +94,28 @@ public function addRoute(string $methodName, string $url, string $response)
}
}

/**
* 处理各种额外选项
*
* @param $reqOpts
*
* @return array
* @throws GrpcPhpClientException
*/
private function requestOpts($reqOpts) : array
{
$opts = [];
if (isset($reqOpts[Options::Metadata])) {
if (!($reqOpts[Options::Metadata] instanceof Metadata)) {
throw new GrpcPhpClientException("Request Options: [Metadata] need " . Metadata::class);
}

$opts[RequestOptions::HEADERS] = GatewayHandle::toHeader($reqOpts[Options::Metadata]);
}

return $opts;
}

/**
* @param $name
* @param $arguments
Expand All @@ -113,6 +137,10 @@ public function __call($name, $arguments)
$opts[RequestOptions::BODY] = $arguments[0]->serializeToJsonString();
}

if (isset($arguments[1])) {
$opts = array_merge($opts, $this->requestOpts($arguments[1]));
}

$response = $this->client->request('POST', $route->getUrl(), $opts);
} catch (ServerException $exception) {
throw $this->exception($exception);
Expand Down
32 changes: 28 additions & 4 deletions src/Client/GrpcBaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ protected function metadata(AbstractCall $call): Metadata
return GrpcHandle::parseCall($call);
}

/**
* @param Metadata $metadata
*
* @return array
*/
private function requestMetadata(Metadata $metadata) : array
{
$md = $metadata->header->toArray();
foreach ($md as $k => $v) {
if (!is_array($v)) {
$v = [$v];
}

$md[$k] = $v;
}

return $md;
}

/**
* 返回原生的grpc调用结果
*
Expand All @@ -153,16 +172,21 @@ protected function metadata(AbstractCall $call): Metadata
*/
protected function rawCall($method, $arguments)
{
if (!isset($arguments[1])) {
$arguments[1] = [];
$rawArguments[0] = $arguments[0];
if (isset($arguments[1]) && $arguments[1][Options::Metadata]) {
$rawArguments[1] = $this->requestMetadata($arguments[1][Options::Metadata]);
} else {
$rawArguments[1] = [];
}

// timeout 单位:微秒
if ($this->timeout > 0) {
$arguments[2]['timeout'] = $this->timeout * 1000;
$rawArguments[2] = [
'timeout' => $this->timeout * 1000,
];
}

$call = call_user_func_array([$this->client, ucfirst($method)], $arguments);
$call = call_user_func_array([$this->client, ucfirst($method)], $rawArguments);

return $call->wait();
}
Expand Down
8 changes: 8 additions & 0 deletions src/Client/Options.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Reatang\GrpcPHPAbstract\Client;

class Options
{
public const Metadata = "metadata";
}
5 changes: 5 additions & 0 deletions src/Exceptions/GrpcPhpClientException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php

namespace Reatang\GrpcPHPAbstract\Exceptions;

class GrpcPhpClientException extends \Exception{ }
4 changes: 2 additions & 2 deletions src/Metadata/GatewayHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public static function toHeader(Metadata $metadata): array
{
$n = [];
foreach ($metadata->header->toArray() as $h => $v) {
$n[GatewayHandle::MetadataHeaderPrefix . $h] = is_array($v) ? join(',', $v) : $v;
$n[GatewayHandle::MetadataHeaderPrefix . ucfirst($h)] = is_array($v) ? join(',', $v) : $v;
}
foreach ($metadata->trailer->toArray() as $h => $v) {
$n[GatewayHandle::MetadataTrailerPrefix . $h] = is_array($v) ? join(',', $v) : $v;
$n[GatewayHandle::MetadataTrailerPrefix . ucfirst($h)] = is_array($v) ? join(',', $v) : $v;
}

return $n;
Expand Down
61 changes: 57 additions & 4 deletions src/Middlewares/GatewayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@

namespace Reatang\GrpcPHPAbstract\Middlewares;

use GuzzleHttp\Psr7\Response;
use OpenTelemetry\API\Globals;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SemConv\TraceAttributes;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Reatang\GrpcPHPAbstract\Metadata\GatewayHandle;
use Reatang\GrpcPHPAbstract\Metadata\Metadata;
use Reatang\GrpcPHPAbstract\Utils\MetadataAccessGetterSetter;

class GatewayMiddleware
{
Expand All @@ -24,8 +32,8 @@ class GatewayMiddleware
*/
public static function GrpcGatewayOpt($timeout): callable
{
return static function (callable $handler) use ($timeout): callable {
return static function (RequestInterface $request, array $options) use ($handler, $timeout) {
return function (callable $handler) use ($timeout): callable {
return function (RequestInterface $request, array $options) use ($handler, $timeout) {
$request = $request
->withAddedHeader('Accept', '*')
->withAddedHeader(GatewayHandle::metadataGrpcTimeout, $timeout);
Expand All @@ -45,8 +53,8 @@ public static function GrpcGatewayOpt($timeout): callable
*/
public static function GrpcMetadata(array $headers, array $trailers = []): callable
{
return static function (callable $handler) use ($headers, $trailers) : callable {
return static function (RequestInterface $request, array $options) use ($headers, $trailers, $handler) {
return function (callable $handler) use ($headers, $trailers) : callable {
return function (RequestInterface $request, array $options) use ($headers, $trailers, $handler) {
$md = Metadata::create($headers, $trailers);

foreach (GatewayHandle::toHeader($md) as $k => $v) {
Expand All @@ -72,4 +80,49 @@ public static function retry(int $maxAttempt = 3, int $delay = 300): callable
return new GatewayRetry($maxAttempt, $delay, $handler);
};
}

public static function openTelemetryTrace(): callable
{
$tracer = Globals::tracerProvider()->getTracer("reatang/grpc-php-abstract");

return function (callable $handler) use ($tracer) {
return function (RequestInterface $request, array $options) use ($handler, $tracer) {
$name = sprintf("POST %s", $request->getUri()->getPath());

$span = $tracer->spanBuilder($name)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent(Context::getCurrent())
->startSpan();
$ctx = $span->storeInContext(Context::getCurrent());
$scope = $span->activate();

$metadata = [];
Globals::propagator()->inject($metadata, MetadataAccessGetterSetter::getInstance(), $ctx);

foreach (GatewayHandle::toHeader(Metadata::create($metadata)) as $k => $v) {
$request = $request->withAddedHeader($k, $v);
}

/** @var \GuzzleHttp\Promise\FulfilledPromise $promise */
$promise = $handler($request, $options);

$promise->then(function (ResponseInterface $response) use ($span, $scope) {
$span->setAttributes([
TraceAttributes::HTTP_STATUS_CODE => $response->getStatusCode(),
]);
$span->setStatus(StatusCode::STATUS_OK)->end();
$scope->detach();
}, function (\Throwable $e) use ($span, $scope) {
$span->setAttributes([
TraceAttributes::EXCEPTION_TYPE => get_class($e),
TraceAttributes::EXCEPTION_MESSAGE => $e->getMessage(),
]);
$span->setStatus(StatusCode::STATUS_ERROR, $e->getMessage())->end();
$scope->detach();
});

return $promise;
};
};
}
}
2 changes: 1 addition & 1 deletion src/Middlewares/GatewayRetry.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private function onRejected(RequestInterface $req, array $options)
{
return function ($reason) use ($req, $options) {
if (!$this->decider($options['retries'], $req, null, $reason)) {
return \GuzzleHttp\Promise\rejection_for($reason);
return \GuzzleHttp\Promise\Create::rejectionFor($reason);
}

return $this->doRetry($req, $options);
Expand Down
3 changes: 3 additions & 0 deletions src/Middlewares/GrpcOpenTelemetryTrace.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public function interceptUnaryUnary(
$span->setAttribute(TraceAttributes::RPC_GRPC_STATUS_CODE, $status->code);

if ($status->code != \Grpc\STATUS_OK) {
$span->setAttributes([
TraceAttributes::RPC_GRPC_STATUS_CODE => $status->code,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $status->details)->end();
} else {
$span->setStatus(StatusCode::STATUS_OK)->end();
Expand Down
30 changes: 30 additions & 0 deletions tests/Features/BaseGatewayTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Reatang\GrpcPHPAbstract\Tests\Features;

use GuzzleHttp\RequestOptions;
use Reatang\GrpcPHPAbstract\Client\Options;
use Reatang\GrpcPHPAbstract\Metadata\Metadata;
use Reatang\GrpcPHPAbstract\Tests\Mock\PB\PingRequest;
use Reatang\GrpcPHPAbstract\Tests\Mock\TestServer;
use Reatang\GrpcPHPAbstract\Tests\Mock\TestServerClient;
use Reatang\GrpcPHPAbstract\Tests\TestCase;

class BaseGatewayTest extends TestCase
{
public function testPing()
{
$response = $this->getMockGatewayClient()->Ping(new PingRequest(["ping" => "test"]));
$this->assertEquals($response->getPong(), "PONG");
}

public function testMetadata()
{
$metadata = "123";
$response = $this->getMockGatewayClient()->Ping(new PingRequest(["ping" => "metadata"]), [
Options::Metadata => Metadata::create(["abc" => $metadata]),
]);

$this->assertEquals($response->getPong(), "PONG" . $metadata);
}
}
11 changes: 5 additions & 6 deletions tests/Features/BaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace Reatang\GrpcPHPAbstract\Tests\Features;

use GuzzleHttp\RequestOptions;
use Reatang\GrpcPHPAbstract\Client\Options;
use Reatang\GrpcPHPAbstract\Metadata\Metadata;
use Reatang\GrpcPHPAbstract\Tests\Mock\PB\PingRequest;
use Reatang\GrpcPHPAbstract\Tests\Mock\TestServer;
use Reatang\GrpcPHPAbstract\Tests\Mock\TestServerClient;
Expand All @@ -18,14 +21,10 @@ public function testPing()
public function testMetadata()
{
$metadata = "123";
/** @var TestServerClient $c */
$c = $this->getMockClient()->rawClient();
$call = $c->Ping(new PingRequest(["ping" => "metadata"]), [
"abc" => [$metadata],
$response = $this->getMockClient()->Ping(new PingRequest(["ping" => "metadata"]), [
Options::Metadata => Metadata::create(["abc" => $metadata]),
]);

[$response, $status] = $call->wait();
$this->assertEquals($status->code, 0);
$this->assertEquals($response->getPong(), "PONG" . $metadata);
}
}
61 changes: 61 additions & 0 deletions tests/Features/OpenTelemetryGatewayTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace Reatang\GrpcPHPAbstract\Tests\Features;

use OpenTelemetry\API\Baggage\Baggage;
use OpenTelemetry\API\Baggage\Propagation\BaggagePropagator;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\Context\Propagation\MultiTextMapPropagator;
use OpenTelemetry\SDK\Sdk;
use OpenTelemetry\SDK\Trace\Sampler\AlwaysOnSampler;
use OpenTelemetry\SDK\Trace\Sampler\ParentBased;
use OpenTelemetry\SDK\Trace\SpanProcessor\NoopSpanProcessor;
use OpenTelemetry\SDK\Trace\TracerProvider;
use Reatang\GrpcPHPAbstract\Tests\Mock\PB\OTelRequest;
use Reatang\GrpcPHPAbstract\Tests\TestCase;

class OpenTelemetryGatewayTest extends TestCase
{
protected function setUp(): void
{
parent::setUp();

// $processor = new SimpleSpanProcessor((new ConsoleSpanExporterFactory())->create());
$processor = new NoopSpanProcessor();
$tracerProvider = new TracerProvider(
$processor,
new ParentBased(new AlwaysOnSampler()),
);

Sdk::builder()
->setTracerProvider($tracerProvider)
->setPropagator(new MultiTextMapPropagator([
TraceContextPropagator::getInstance(),
BaggagePropagator::getInstance(),
]))
->setAutoShutdown(true)
->buildAndRegisterGlobal();
}

public function testBase()
{
$response = $this->getMockGatewayClient()->OTel(new OTelRequest());

$this->assertTrue(!empty($response->getTrace()));
$this->assertNotEquals($response->getTrace(), '00000000000000000000000000000000');
}

public function testBaggage()
{
$baggageVar = "123";
$scope = Baggage::getCurrent()->toBuilder()->set("baggage1", $baggageVar)->build()->activate();

$response = $this->getMockGatewayClient()->OTel(new OTelRequest());

$scope->detach();

$this->assertTrue(!empty($response->getTrace()));
$this->assertNotEquals($response->getTrace(), '00000000000000000000000000000000');
$this->assertEquals($baggageVar, $response->getBaggage());
}
}
Loading

0 comments on commit e9f8965

Please sign in to comment.