-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storage Squid integration with Argus/Colossus #5001
Changes from 15 commits
de5166d
8f6e650
1757c48
65f3220
bb195a8
d1e0ba3
c758662
7bc5bd3
44240df
01d1f29
986d4d0
d6cf6ba
6a20fa7
a1355e9
a23306e
877f53b
5993958
2bbf0af
8c03814
f63efd1
36212f0
83d3f43
7d5d37f
3a6b0e4
efcab07
adb71b7
e2ce216
04b5cc6
0b8c3f8
fd44d0d
7ff9f63
190daa0
897c7c0
b625322
d8476d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,11 +49,11 @@ GRAPHQL_SERVER_HOST=localhost | |
# Websocket RPC endpoint containers will use. | ||
JOYSTREAM_NODE_WS=ws://joystream-node:9944/ | ||
|
||
# Query node which colossus will use | ||
COLOSSUS_QUERY_NODE_URL=http://graphql-server:8081/graphql | ||
# Query node (graphql-server) which distributor will use | ||
COLOSSUS_QUERY_NODE_URL=http://squid-graphql-server:4352/graphql | ||
|
||
# Query node which distributor will use | ||
DISTRIBUTOR_QUERY_NODE_URL=http://graphql-server:8081/graphql | ||
# Query node (graphql-server) which distributor will use | ||
DISTRIBUTOR_QUERY_NODE_URL=http://squid-graphql-server:4352/graphql | ||
|
||
# Indexer gateway used by processor. If you don't use the local indexer set this to a remote gateway | ||
PROCESSOR_INDEXER_GATEWAY=http://hydra-indexer-gateway:4000/graphql | ||
|
@@ -155,7 +155,7 @@ [email protected] | |
SQD_DEBUG=api:* | ||
OPENAPI_PLAYGROUND=true | ||
|
||
ARCHIVE_GATEWAY_URL=${CUSTOM_ARCHIVE_GATEWAY_URL:-http://squid-archive-gateway:8000/graphql} | ||
ORION_ARCHIVE_GATEWAY_URL=${CUSTOM_ARCHIVE_GATEWAY_URL:-http://squid-archive-gateway:8000/graphql} | ||
|
||
# ===================================================================================== | ||
|
||
|
@@ -169,3 +169,22 @@ TELEMETRY_ENDPOINT=http://collector:4318 | |
# We do not provide a default value - scripts that startup a joystream-node service | ||
# Should be explicit about what version to use. | ||
# JOYSTREAM_NODE_TAG=latest | ||
|
||
# ===================================================================================== | ||
## Storage-Squid configuration | ||
|
||
# Db config | ||
SQUID_DB_HOST=squid_db | ||
SQUID_DB_NAME=squid | ||
SQUID_DB_PASS=squid | ||
SQUID_DB_PORT=23332 | ||
|
||
# Processor service prometheus port | ||
SQUID_PROCESSOR_PROMETHEUS_PORT=3337 | ||
|
||
# Graphql server port | ||
SQUID_GQL_PORT=4352 | ||
|
||
# Archive gateway host (Should not be set in local development) | ||
# For running a production storage-squid instance uncomment the following line (to use the subsquid hosted archive) | ||
# SQUID_ARCHIVE_GATEWAY_URL=${CUSTOM_ARCHIVE_GATEWAY_URL:-https://v2.archive.subsquid.io/network/joystream} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
id: test-node | ||
endpoints: | ||
queryNode: http://localhost:8081/graphql | ||
queryNode: http://localhost:4352/graphql | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should make it clear that this is not the query node now. I suggest to rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind the name so much. Just wondering where the port number selection came from, is it just 4250 (orion graphql port) + 2 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@mnaamani yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's not purely cosmetic, I'm just thinking about operators upgrading and possibly forgetting to update this config value. If we leave the current name and somebody keeps the old value, then they will try to query old QN, which will most likely lead to some weird GraphQL errors that may not be as readable and clear. If we update the config key, any operator that forgot to update their config will get a clear error about missing config key. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think that is a good argument actually. |
||
joystreamNodeWs: ws://localhost:9944 | ||
directories: | ||
assets: ./local/data | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -31,7 +31,7 @@ | |||||
"express-winston": "^4.1.0", | ||||||
"fast-safe-stringify": "^2.1.1", | ||||||
"file-type": "^16.5.1", | ||||||
"graphql": "^14.7.0", | ||||||
"graphql": "^15.3.0", | ||||||
"graphql-tag": "^2.12.6", | ||||||
"inquirer": "^8.1.2", | ||||||
"js-image-generator": "^1.0.3", | ||||||
|
@@ -138,6 +138,7 @@ | |||||
"prepack": "rm -rf lib && tsc -b && oclif-dev manifest && generate:all", | ||||||
"test": "nyc --extension .ts mocha --forbid-only \"test/**/*.test.ts\"", | ||||||
"version": "generate:docs:cli && git add docs/cli/*", | ||||||
"generate:schema:graphql": "docker run --rm joystream/storage-squid:latest npm run get-graphql-schema > src/services/networking/query-node/schema.graphql", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In cases docker run fails or scripts doesn't connect, the resulting This step is done manually and not part of automated build step, but it would be better for it to fail in a better way. I'm guessing on my machine 5 seconds was not long enough for graphql-server to startup. Interactively it works fine..
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have created a new shell script that will only write to output file if the returned schema response is not empty. Let me know if it's fine. Added in 3a6b0e4 |
||||||
"generate:types:json-schema": "yarn ts-node --transpile-only ./src/schemas/scripts/generateTypes.ts", | ||||||
"generate:types:graphql": "yarn graphql-codegen -c ./src/services/networking/query-node/codegen.yml", | ||||||
"generate:types:public-api": "yarn openapi-typescript ./src/api-spec/public.yml -o ./src/types/generated/PublicApi.ts -c ../prettierrc.js", | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,28 @@ | ||
import { ReadonlyConfig } from '../../types/config' | ||
import { QueryFetchPolicy, QueryNodeApi } from './query-node/api' | ||
import { Logger } from 'winston' | ||
import { LoggingService } from '../logging' | ||
import { StorageNodeApi } from './storage-node/api' | ||
import { StateCacheService } from '../cache/StateCacheService' | ||
import { DataObjectDetailsFragment } from './query-node/generated/queries' | ||
import axios from 'axios' | ||
import http from 'http' | ||
import https from 'https' | ||
import Mime from 'mime/lite' | ||
import queue from 'queue' | ||
import { Logger } from 'winston' | ||
import { | ||
StorageNodeEndpointData, | ||
DataObjectAccessPoints, | ||
DataObjectData, | ||
DataObjectInfo, | ||
StorageNodeDownloadResponse, | ||
DownloadData, | ||
StatusResponse, | ||
StorageNodeDownloadResponse, | ||
StorageNodeEndpointData, | ||
} from '../../types' | ||
import queue from 'queue' | ||
import { DistributionBucketOperatorStatus } from './query-node/generated/schema' | ||
import http from 'http' | ||
import https from 'https' | ||
import { ReadonlyConfig } from '../../types/config' | ||
import { StateCacheService } from '../cache/StateCacheService' | ||
import { LoggingService } from '../logging' | ||
import { parseAxiosError } from '../parsers/errors' | ||
import { PendingDownload, PendingDownloadStatusType } from './PendingDownload' | ||
import Mime from 'mime/lite' | ||
import { QueryFetchPolicy, QueryNodeApi } from './query-node/api' | ||
import { DataObjectDetailsFragment } from './query-node/generated/queries' | ||
import { DistributionBucketOperatorStatus } from './query-node/generated/schema' | ||
import { RuntimeApi } from './runtime/api' | ||
import { StorageNodeApi } from './storage-node/api' | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we discussed this once, but it is hard to see what imports were added or removed when they are also being re-ordered. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted the re-ordering change |
||
// Concurrency limits | ||
export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_OBJECT = 10 | ||
|
@@ -30,6 +31,7 @@ export const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10 | |
export class NetworkingService { | ||
private config: ReadonlyConfig | ||
private queryNodeApi: QueryNodeApi | ||
private runtimeApi!: RuntimeApi | ||
private logging: LoggingService | ||
private stateCache: StateCacheService | ||
private logger: Logger | ||
|
@@ -65,6 +67,12 @@ export class NetworkingService { | |
this.downloadQueue.on('error', (err) => { | ||
this.logger.error('Data object download failed', { err }) | ||
}) | ||
|
||
this.initRuntimeApi().catch((err) => this.logger.error('Runtime API initialization failed:', { err })) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose it is not critical for runtimeApi to be functioning, as only place i see it used is in the public api endpoint to get the status. But if the api is not initialized and the endpoint is called and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think this is currently happening for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Was this added recently? Referencing my previous comment:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm actually it is handled and Argus doesn't crash, so I guess we just need better error message in that case |
||
|
||
private async initRuntimeApi() { | ||
this.runtimeApi = await RuntimeApi.create(this.logging, this.config.endpoints.joystreamNodeWs) | ||
} | ||
|
||
private validateNodeEndpoint(endpoint: string): void { | ||
|
@@ -97,12 +105,12 @@ export class NetworkingService { | |
|
||
private prepareStorageNodeEndpoints(details: DataObjectDetailsFragment) { | ||
const endpointsData = details.storageBag.storageBuckets | ||
.filter((bucket) => bucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive') | ||
.map((bucket) => { | ||
const rootEndpoint = bucket.operatorMetadata?.nodeEndpoint | ||
.filter(({ storageBucket }) => storageBucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive') | ||
.map(({ storageBucket }) => { | ||
const rootEndpoint = storageBucket.operatorMetadata?.nodeEndpoint | ||
const apiEndpoint = rootEndpoint ? this.getApiEndpoint(rootEndpoint) : '' | ||
return { | ||
bucketId: bucket.id, | ||
bucketId: storageBucket.id, | ||
endpoint: apiEndpoint, | ||
} | ||
}) | ||
|
@@ -119,8 +127,8 @@ export class NetworkingService { | |
private getDataObjectActiveDistributorsSet(objectDetails: DataObjectDetailsFragment): Set<number> { | ||
const activeDistributorsSet = new Set<number>() | ||
const { distributionBuckets } = objectDetails.storageBag | ||
for (const bucket of distributionBuckets) { | ||
for (const operator of bucket.operators) { | ||
for (const { distributionBucket } of distributionBuckets) { | ||
for (const operator of distributionBucket.operators) { | ||
if (operator.status === DistributionBucketOperatorStatus.Active) { | ||
activeDistributorsSet.add(operator.workerId) | ||
} | ||
|
@@ -151,14 +159,16 @@ export class NetworkingService { | |
isSupported = typeof this.config.workerId === 'number' ? distributors.has(this.config.workerId) : false | ||
} else { | ||
const supportedBucketIds = this.config.buckets.map((id) => id.toString()) | ||
isSupported = details.storageBag.distributionBuckets.some((b) => supportedBucketIds.includes(b.id)) | ||
isSupported = details.storageBag.distributionBuckets.some((b) => | ||
supportedBucketIds.includes(b.distributionBucket.id) | ||
) | ||
} | ||
data = { | ||
objectId, | ||
accessPoints: this.parseDataObjectAccessPoints(details), | ||
contentHash: details.ipfsHash, | ||
size: parseInt(details.size), | ||
fallbackMimeType: this.parseUserProvidedMimeType(details.type.subtitle?.mimeType), | ||
fallbackMimeType: this.parseUserProvidedMimeType(details.type?.subtitle?.mimeType), | ||
} | ||
} | ||
|
||
|
@@ -384,7 +394,7 @@ export class NetworkingService { | |
contentHash: ipfsHash, | ||
objectId: id, | ||
size: parseInt(size), | ||
fallbackMimeType: this.parseUserProvidedMimeType(type.subtitle?.mimeType), | ||
fallbackMimeType: this.parseUserProvidedMimeType(type?.subtitle?.mimeType), | ||
}) | ||
}) | ||
|
||
|
@@ -437,16 +447,16 @@ export class NetworkingService { | |
} | ||
|
||
async getQueryNodeStatus(): Promise<StatusResponse['queryNodeStatus']> { | ||
const qnState = await this.queryNodeApi.getQueryNodeState() | ||
const squidStatus = await this.queryNodeApi.getQueryNodeState() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should try-catch this? Relating to https://github.com/Joystream/joystream/pull/5001/files#r1443286342 |
||
if (qnState === null) { | ||
this.logger.error("Couldn't fetch the state from connected query-node") | ||
if (squidStatus === null) { | ||
this.logger.error("Couldn't fetch the state from connected storage-squid") | ||
} | ||
|
||
return { | ||
url: this.config.endpoints.queryNode, | ||
chainHead: qnState?.chainHead || 0, | ||
blocksProcessed: qnState?.lastCompleteBlock || 0, | ||
chainHead: (await this.runtimeApi.derive.chain.bestNumber()).toNumber() || 0, | ||
blocksProcessed: squidStatus?.height || 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly worth try-catching as well so if |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,9 @@ | ||
import { | ||
ApolloClient, | ||
DocumentNode, | ||
FetchPolicy, | ||
HttpLink, | ||
NormalizedCacheObject, | ||
from, | ||
split, | ||
} from '@apollo/client/core' | ||
import { ApolloClient, DocumentNode, FetchPolicy, HttpLink, NormalizedCacheObject, from } from '@apollo/client/core' | ||
import { onError } from '@apollo/client/link/error' | ||
import { WebSocketLink } from '@apollo/client/link/ws' | ||
import { getMainDefinition } from '@apollo/client/utilities' | ||
import { InvalidationPolicyCache } from '@nerdwallet/apollo-cache-policies' | ||
import fetch from 'cross-fetch' | ||
import { FragmentDefinitionNode } from 'graphql' | ||
import { Logger } from 'winston' | ||
import ws from 'ws' | ||
import { ReadonlyConfig } from '../../../types' | ||
import { LoggingService } from '../../logging' | ||
import { | ||
|
@@ -35,11 +24,10 @@ import { | |
GetDistributionBucketsWithBagsByWorkerIdQuery, | ||
GetDistributionBucketsWithBagsByWorkerIdQueryVariables, | ||
MinimalDataObjectFragment, | ||
QueryNodeState, | ||
QueryNodeStateFields, | ||
QueryNodeStateFieldsFragment, | ||
QueryNodeStateSubscription, | ||
QueryNodeStateSubscriptionVariables, | ||
SquidStatus, | ||
SquidStatusFieldsFragment, | ||
SquidStatusQuery, | ||
SquidStatusQueryVariables, | ||
StorageBagWithObjectsFragment, | ||
StorageBucketOperatorFieldsFragment, | ||
} from './generated/queries' | ||
|
@@ -62,8 +50,6 @@ type PaginationQueryResult<T = unknown> = { | |
} | ||
} | ||
|
||
type CustomVariables<T> = Omit<T, keyof PaginationQueryVariables> | ||
|
||
export class QueryNodeApi { | ||
private config: ReadonlyConfig | ||
private apolloClient: ApolloClient<NormalizedCacheObject> | ||
|
@@ -79,33 +65,11 @@ export class QueryNodeApi { | |
exitOnError && process.exit(-1) | ||
}) | ||
|
||
const queryLink = from([errorLink, new HttpLink({ uri: this.config.endpoints.queryNode, fetch })]) | ||
const wsLink = new WebSocketLink({ | ||
uri: this.config.endpoints.queryNode, | ||
options: { | ||
reconnect: true, | ||
}, | ||
webSocketImpl: ws, | ||
}) | ||
const splitLink = split( | ||
({ query }) => { | ||
const definition = getMainDefinition(query) | ||
return definition.kind === 'OperationDefinition' && definition.operation === 'subscription' | ||
}, | ||
wsLink, | ||
queryLink | ||
) | ||
|
||
this.apolloClient = new ApolloClient({ | ||
link: splitLink, | ||
link: from([errorLink, new HttpLink({ uri: this.config.endpoints.queryNode, fetch })]), | ||
// Ref: https://www.apollographql.com/docs/react/api/core/ApolloClient/#assumeimmutableresults | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see where this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 thanks I guess it was hard to spot because of so many occurrences of the |
||
assumeImmutableResults: true, | ||
cache: new InvalidationPolicyCache({ | ||
typePolicies: { | ||
ProcessorState: { | ||
keyFields: (object) => object.__typename, | ||
}, | ||
}, | ||
invalidationPolicies: { | ||
types: { | ||
StorageDataObject: { | ||
|
@@ -126,7 +90,7 @@ export class QueryNodeApi { | |
query: DocumentNode, | ||
variables: VariablesT, | ||
resultKey: keyof QueryT, | ||
fetchPolicy: QueryFetchPolicy | ||
fetchPolicy: QueryFetchPolicy = 'no-cache' | ||
): Promise<Required<QueryT>[keyof QueryT] | null> { | ||
return ( | ||
(await this.apolloClient.query<QueryT, VariablesT>({ query, variables, fetchPolicy })).data[resultKey] || null | ||
|
@@ -266,30 +230,18 @@ export class QueryNodeApi { | |
} | ||
|
||
public getActiveStorageBucketOperatorsData(): Promise<StorageBucketOperatorFieldsFragment[]> { | ||
return this.multipleEntitiesWithPagination< | ||
StorageBucketOperatorFieldsFragment, | ||
return this.multipleEntitiesQuery< | ||
GetActiveStorageBucketOperatorsDataQuery, | ||
CustomVariables<GetActiveStorageBucketOperatorsDataQueryVariables> | ||
>(GetActiveStorageBucketOperatorsData, {}, 'storageBucketsConnection') | ||
GetActiveStorageBucketOperatorsDataQueryVariables | ||
>(GetActiveStorageBucketOperatorsData, {}, 'storageBuckets') | ||
} | ||
|
||
public async getQueryNodeState(): Promise<QueryNodeStateFieldsFragment | null> { | ||
// fetch cached state | ||
const cachedState = this.readFragment<QueryNodeStateFieldsFragment, QueryNodeStateSubscriptionVariables>( | ||
'ProcessorState', | ||
QueryNodeStateFields | ||
) | ||
|
||
// If we have the state in cache, return it | ||
if (cachedState) { | ||
return cachedState | ||
} | ||
|
||
// Otherwise setup the subscription (which will periodically update the cache) and return for the first result | ||
return this.uniqueEntitySubscription<QueryNodeStateSubscription, QueryNodeStateSubscriptionVariables>( | ||
QueryNodeState, | ||
public async getQueryNodeState(): Promise<SquidStatusFieldsFragment | null> { | ||
const squidStatus = await this.uniqueEntityQuery<SquidStatusQuery, SquidStatusQueryVariables>( | ||
SquidStatus, | ||
{}, | ||
'stateSubscription' | ||
'squidStatus' | ||
) | ||
return squidStatus | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PROCESSOR_PROMETHEUS_PORT
earlier in the file is also assigned 3337But this is only a problem if orion processor is also running on same host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 36212f0