Skip to content

Commit

Permalink
More Lemonade instrumentation (#2173)
Browse files Browse the repository at this point in the history
  • Loading branch information
robknight authored Nov 13, 2024
1 parent 0f2cfe5 commit 77cdd13
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,178 +781,145 @@ export class LemonadePipeline implements BasePipeline {
email: string,
identityCommitment: string
): Promise<EdDSATicketPCD[]> {
// Load atom-backed tickets
const relevantTickets = await this.db.loadByEmail(this.id, email);
return traced(LOG_NAME, "getTicketsForEmail", async () => {
// Load atom-backed tickets

// Load check-in data
const checkIns = await this.checkinDB.getByTicketIds(
client,
this.id,
relevantTickets.map((ticket) => ticket.id)
);
const checkInsById = _.keyBy(checkIns, (checkIn) => checkIn.ticketId);
const relevantTickets = await this.db.loadByEmail(this.id, email);

// Convert atoms to ticket data
const ticketDatas = relevantTickets.map((t) => {
if (checkInsById[t.id]) {
t.checkinDate = checkInsById[t.id].timestamp;
}
return this.atomToTicketData(t, identityCommitment);
});
// Load manual tickets from the definition
const manualTickets = this.getManualTicketsForEmail(email);
// Convert manual tickets to ticket data and add to array
ticketDatas.push(
...(await Promise.all(
manualTickets.map((manualTicket) =>
this.manualTicketToTicketData(
client,
manualTicket,
identityCommitment
)
// Load check-in data
const checkIns = await traced(LOG_NAME, "get checkins", async () =>
this.checkinDB.getByTicketIds(
client,
this.id,
relevantTickets.map((ticket) => ticket.id)
)
))
);
);
const checkInsById = _.keyBy(checkIns, (checkIn) => checkIn.ticketId);

// Turn ticket data into PCDs
const tickets = await Promise.all(
ticketDatas.map((t) => this.getOrGenerateTicket(t))
);
// Convert atoms to ticket data
const ticketDatas = relevantTickets.map((t) => {
if (checkInsById[t.id]) {
t.checkinDate = checkInsById[t.id].timestamp;
}
return this.atomToTicketData(t, identityCommitment);
});
// Load manual tickets from the definition
const manualTickets = this.getManualTicketsForEmail(email);
// Convert manual tickets to ticket data and add to array
ticketDatas.push(
...(await Promise.all(
manualTickets.map((manualTicket) =>
this.manualTicketToTicketData(
client,
manualTicket,
identityCommitment
)
)
))
);

return tickets;
// Turn ticket data into PCDs
const tickets = await Promise.all(
ticketDatas.map((t) => this.getOrGenerateTicket(t))
);

return tickets;
});
}

private async issueLemonadeTicketPCDs(
req: PollFeedRequest
): Promise<PollFeedResponseValue> {
return traced(LOG_NAME, "issueLemonadeTicketPCDs", async (span) => {
return sqlQueryWithPool(this.context.dbPool, async (client) => {
tracePipeline(this.definition);
tracePipeline(this.definition);

if (!req.pcd) {
throw new Error("missing credential pcd");
}
if (!req.pcd) {
throw new Error("missing credential pcd");
}

if (
this.definition.options.paused &&
!(await this.db.hasLoaded(this.id))
) {
return { actions: [] };
}
if (
this.definition.options.paused &&
!(await this.db.hasLoaded(this.id))
) {
return { actions: [] };
}

const credential =
await this.credentialSubservice.verifyAndExpectZupassEmail(req.pcd);
const credential =
await this.credentialSubservice.verifyAndExpectZupassEmail(req.pcd);

const { emails, semaphoreId } = credential;
const { emails, semaphoreId } = credential;

if (!emails || emails.length === 0) {
throw new Error("missing emails in credential");
}
if (!emails || emails.length === 0) {
throw new Error("missing emails in credential");
}

span?.setAttribute("emails", emails.map((e) => e.email).join(","));
span?.setAttribute("semaphore_id", semaphoreId);

// let didUpdate = false;
// for (const email of emails) {
// // Consumer is validated, so save them in the consumer list
// didUpdate =
// didUpdate ||
// (await this.consumerDB.save(
// client,
// this.id,
// email.email,
// semaphoreId,
// new Date()
// ));
// }

// if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
// // If the user's Semaphore commitment has changed, `didUpdate` will be
// // true, and we need to update the Semaphore groups
// if (didUpdate) {
// span?.setAttribute("semaphore_groups_updated", true);
// await this.triggerSemaphoreGroupUpdate(client);
// }
// }

const tickets = (
await Promise.all(
emails.map((e) =>
this.getTicketsForEmail(client, e.email, semaphoreId)
span?.setAttribute("emails", emails.map((e) => e.email).join(","));
span?.setAttribute("semaphore_id", semaphoreId);

// let didUpdate = false;
// for (const email of emails) {
// // Consumer is validated, so save them in the consumer list
// didUpdate =
// didUpdate ||
// (await this.consumerDB.save(
// client,
// this.id,
// email.email,
// semaphoreId,
// new Date()
// ));
// }

// if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
// // If the user's Semaphore commitment has changed, `didUpdate` will be
// // true, and we need to update the Semaphore groups
// if (didUpdate) {
// span?.setAttribute("semaphore_groups_updated", true);
// await this.triggerSemaphoreGroupUpdate(client);
// }
// }

const tickets = await sqlQueryWithPool(
this.context.dbPool,
async (client) => {
return (
await Promise.all(
emails.map((e) =>
this.getTicketsForEmail(client, e.email, semaphoreId)
)
)
)
).flat();

const ticketActions: PCDAction[] = [];

if (await this.db.hasLoaded(this.id)) {
ticketActions.push({
type: PCDActionType.DeleteFolder,
folder: this.definition.options.feedOptions.feedFolder,
recursive: true
});
).flat();
}
);

const ticketPCDs = await Promise.all(
tickets.map((t) => EdDSATicketPCDPackage.serialize(t))
);
const ticketActions: PCDAction[] = [];

if (await this.db.hasLoaded(this.id)) {
ticketActions.push({
type: PCDActionType.ReplaceInFolder,
type: PCDActionType.DeleteFolder,
folder: this.definition.options.feedOptions.feedFolder,
pcds: ticketPCDs
recursive: true
});
}

const contactsFolder = `${this.definition.options.feedOptions.feedFolder}/contacts`;
const contacts = (
await Promise.all(
emails.map((e) => this.getReceivedContactsForEmail(client, e.email))
)
).flat();
const contactActions: PCDAction[] = [
{
type: PCDActionType.DeleteFolder,
folder: contactsFolder,
recursive: true
},
{
type: PCDActionType.ReplaceInFolder,
folder: contactsFolder,
pcds: contacts
}
];

const badgeFolder = `${this.definition.options.feedOptions.feedFolder}/badges`;

const badges = (
await Promise.all(
emails.map((e) => this.getReceivedBadgesForEmail(client, e.email))
)
).flat();
const badgeActions: PCDAction[] = [
{
type: PCDActionType.DeleteFolder,
folder: badgeFolder,
recursive: true
},
{
type: PCDActionType.ReplaceInFolder,
folder: badgeFolder,
pcds: badges
}
];
const ticketPCDs = await traced(LOG_NAME, "serialize tickets", async () =>
Promise.all(tickets.map((t) => EdDSATicketPCDPackage.serialize(t)))
);

traceFlattenedObject(span, {
pcds_issued: tickets.length + badges.length + contacts.length,
tickets_issued: tickets.length,
badges_issued: badges.length,
contacts_issued: contacts.length
});
ticketActions.push({
type: PCDActionType.ReplaceInFolder,
folder: this.definition.options.feedOptions.feedFolder,
pcds: ticketPCDs
});

return {
actions: [...ticketActions, ...contactActions, ...badgeActions]
};
traceFlattenedObject(span, {
pcds_issued: tickets.length,
tickets_issued: tickets.length
});

return {
actions: [...ticketActions]
};
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { LRUCache } from "lru-cache";
import { Pool } from "postgres-pool";
import { loadZupassEdDSAPublicKey } from "../../issuanceService";
import { traced } from "../../telemetryService";

/**
* Manages server-side verification of credential PCDs.
Expand Down Expand Up @@ -62,26 +63,35 @@ export class CredentialSubservice {
public async verifyAndExpectZupassEmail(
credential: Credential
): Promise<VerifiedCredential> {
const verifiedCredential = await this.verify(credential);
return traced(
"CredentialSubservice",
"verifyAndExpectZupassEmail",
async () => {
const verifiedCredential = await this.verify(credential);

if (!verifiedCredential.emails || verifiedCredential.emails.length === 0) {
throw new VerificationError("Missing Email PCDs");
}
if (
!verifiedCredential.emails ||
verifiedCredential.emails.length === 0
) {
throw new VerificationError("Missing Email PCDs");
}

for (const signedEmail of verifiedCredential.emails) {
const { email, semaphoreId, signer } = signedEmail;
for (const signedEmail of verifiedCredential.emails) {
const { email, semaphoreId, signer } = signedEmail;

if (!email || !semaphoreId) {
throw new VerificationError("Missing email PCD in credential");
}
if (!verifiedCredential.authKey && !this.isZupassPublicKey(signer)) {
throw new VerificationError(
`Email PCD not signed by Zupass. expected ${this.zupassPublicKey} but got ${signer}`
);
}
}
if (!email || !semaphoreId) {
throw new VerificationError("Missing email PCD in credential");
}
if (!verifiedCredential.authKey && !this.isZupassPublicKey(signer)) {
throw new VerificationError(
`Email PCD not signed by Zupass. expected ${this.zupassPublicKey} but got ${signer}`
);
}
}

return { ...verifiedCredential, emails: verifiedCredential.emails };
return { ...verifiedCredential, emails: verifiedCredential.emails };
}
);
}

/**
Expand Down

0 comments on commit 77cdd13

Please sign in to comment.