Skip to content

Commit 2320c4e

Browse files
authored
Merge pull request #27 from EYBlockchain/aleandrom/TAX-2490-support-trackingID-for-breadcrumbs
feat: rabbitmq reconnection logic & support for trackingIDs
2 parents a901257 + 155ed68 commit 2320c4e

File tree

5 files changed

+106
-69
lines changed

5 files changed

+106
-69
lines changed

src/queues/generateProof.mjs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import rabbitmq from '../utils/rabbitmq.mjs';
22
import logger from '../utils/logger.mjs';
33
import generateProof from '../services/generateProof.mjs';
4+
import { formatTrackingID } from '../utils/formatter.mjs';
45

56
export default function receiveMessage() {
67
rabbitmq.receiveMessage('generate-proof', async message => {
@@ -11,17 +12,21 @@ export default function receiveMessage() {
1112
data: null,
1213
};
1314

15+
let trackingID = '';
1416
try {
15-
response.data = await generateProof(JSON.parse(
16-
message.content.toString(),
17-
));
17+
trackingID = JSON.parse(message.content.toString()).trackingID;
18+
response.data = await generateProof(JSON.parse(message.content.toString()));
1819
} catch (err) {
19-
logger.error('Error in generate-proof', err);
20-
response.error = 'Proof generation failed';
20+
logger.error(`${formatTrackingID(trackingID)} Error in generate-proof:`, err);
21+
response.error = `${formatTrackingID(trackingID)} Proof generation failed`;
2122
}
2223

2324

2425
rabbitmq.sendMessage(replyTo, response, { correlationId });
25-
rabbitmq.sendACK(message);
26+
try {
27+
rabbitmq.sendACK(message);
28+
} catch (error) {
29+
logger.warn(`The acknowledgement failed, the channel was closed: ${error}`);
30+
}
2631
});
2732
}

src/services/generateKeys.mjs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import fs from 'fs';
22
import path from 'path';
33
import zokrates from '@eyblockchain/zokrates-zexe.js';
4-
import rabbitmq from '../utils/rabbitmq.mjs';
54
import logger from '../utils/logger.mjs';
65

76
export default async function({

src/services/generateProof.mjs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import zokrates from '@eyblockchain/zokrates-zexe.js';
55
import path from 'path';
66
import { getProofFromFile } from '../utils/filing.mjs';
77
import logger from '../utils/logger.mjs';
8+
import { formatTrackingID } from '../utils/formatter.mjs';
89

910
const unlink = util.promisify(fs.unlink);
1011

@@ -16,6 +17,7 @@ export default async function ({
1617
proofFileName,
1718
backend = 'ark', // zexe backend now named ark
1819
provingScheme = 'gm17',
20+
trackingID,
1921
}) {
2022
const outputPath = `./output`;
2123
let proof, publicInputs;
@@ -29,11 +31,11 @@ export default async function ({
2931
const proofJsonFile = `${circuitName}_${fileNamePrefix}_proof.json`;
3032

3133
if (fs.existsSync(`${outputPath}/${folderpath}/${witnessFile}`)) {
32-
throw Error('Witness file with same name exists');
34+
throw Error(`${formatTrackingID(trackingID)} Witness file with same name exists`);
3335
}
3436

3537
if (fs.existsSync(`${outputPath}/${folderpath}/${proofJsonFile}`)) {
36-
throw Error('proof.json file with same name exists');
38+
throw Error(`${formatTrackingID(trackingID)} proof.json file with same name exists`);
3739
}
3840

3941
const opts = {};
@@ -42,15 +44,15 @@ export default async function ({
4244
opts.fileName = proofFileName || `${proofJsonFile}`;
4345

4446
try {
45-
logger.info('Compute witness...');
47+
logger.info(`${formatTrackingID(trackingID)} Compute witness...`);
4648
await zokrates.computeWitness(
4749
`${outputPath}/${folderpath}/${circuitName}_out`,
4850
`${outputPath}/${folderpath}/`,
4951
`${witnessFile}`,
5052
inputs,
5153
);
5254

53-
logger.info('Generate proof...');
55+
logger.info(`${formatTrackingID(trackingID)} Generate proof...`);
5456
await zokrates.generateProof(
5557
`${outputPath}/${folderpath}/${circuitName}_pk.key`,
5658
`${outputPath}/${folderpath}/${circuitName}_out`,
@@ -63,7 +65,7 @@ export default async function ({
6365
({ proof, inputs: publicInputs } = await getProofFromFile(`${folderpath}/${proofJsonFile}`));
6466

6567
logger.info(`Complete`);
66-
logger.debug(`Responding with proof and inputs:`);
68+
logger.debug(`${formatTrackingID(trackingID)}Responding with proof and inputs:`);
6769
logger.debug(publicInputs);
6870
} finally {
6971
try {

src/utils/formatter.mjs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export const formatTrackingID = trackingID => {
2+
return trackingID ? `[${trackingID}] - ` : '';
3+
};

src/utils/rabbitmq.mjs

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,89 @@
11
import amqp from 'amqplib';
2+
import queues from '../queues/index.mjs';
3+
import logger from '../utils/logger.mjs';
4+
5+
let connection;
6+
let channel;
7+
8+
// connect to RabbitMQ server.
9+
const connect = async () => {
10+
logger.info('[AMQP] Connecting...');
11+
connection = await amqp.connect(`${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`);
12+
connection.on('error', err => {
13+
if (err.message !== 'Connection closing') {
14+
logger.error(`[AMQP] Connection error: ${err.message}`);
15+
return setTimeout(connect, 1000);
16+
}
17+
});
18+
connection.on('close', () => {
19+
logger.debug('[AMQP] Reconnecting');
20+
return setTimeout(connect, 1000);
21+
});
22+
channel = await connection.createChannel();
23+
channel.prefetch(1);
24+
queues();
25+
};
26+
27+
// publish message to a queue
28+
const sendMessage = async (queue, data, options = {}) => {
29+
channel.sendToQueue(queue, Buffer.from(JSON.stringify(data)), options);
30+
};
31+
32+
// only called from test-suite ./test/queue.mjs
33+
const cancelChannelConsume = consumerTag => {
34+
channel.cancel(consumerTag);
35+
};
36+
37+
const sendACK = message => {
38+
channel.ack(message);
39+
};
40+
41+
// only called from test-suite ./test/queue.mjs
42+
const sendNACK = message => {
43+
channel.nack(message);
44+
};
45+
46+
/*
47+
* Consumer: receive message from a queue
48+
*/
49+
const receiveMessage = async (queue, callback) => {
50+
await channel.assertQueue(queue);
51+
channel.consume(queue, callback, { noAck: false });
52+
};
53+
54+
// only called from test-suite ./test/queue.mjs
55+
const listenToReplyQueue = async (queue, correlationId, callback) => {
56+
logger.info(`[AMQP] Listening to reply queue: ${queue}`);
57+
receiveMessage(queue, message => {
58+
if (message.properties.correlationId !== correlationId) {
59+
logger.debug(`[AMQP] Sending NACK due to different correlation id: ${JSON.stringify({ message: message.properties.correlationId, param: correlationId })}`);
60+
return sendNACK(message);
61+
}
62+
cancelChannelConsume(message.fields.consumerTag);
63+
sendACK(message);
64+
65+
const response = JSON.parse(message.content.toString());
66+
response.type = message.properties.type;
67+
68+
return callback(response);
69+
});
70+
};
71+
72+
// only called from test-suite ./test/queue.mjs
73+
const close = async () => {
74+
await channel.close();
75+
await connection.close();
76+
};
277

378
export default {
4-
// connect to RabbitMQ server.
5-
async connect() {
6-
this.connection = await amqp.connect(
7-
`${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`,
8-
);
9-
this.channel = await this.connection.createChannel();
10-
this.channel.prefetch(1);
11-
},
12-
13-
// publish message to a queue
14-
async sendMessage(queue, data, options = {}) {
15-
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(data)), options);
16-
},
17-
18-
// only called from test-suite ./test/queue.mjs
19-
cancelChannelConsume(consumerTag) {
20-
this.channel.cancel(consumerTag);
21-
},
22-
23-
sendACK(message) {
24-
this.channel.ack(message);
25-
},
26-
27-
// only called from test-suite ./test/queue.mjs
28-
sendNACK(message) {
29-
this.channel.nack(message);
30-
},
31-
32-
/*
33-
* Consumer: receive message from a queue
34-
*/
35-
async receiveMessage(queue, callback) {
36-
await this.channel.assertQueue(queue);
37-
this.channel.consume(queue, callback);
38-
},
39-
40-
// only called from test-suite ./test/queue.mjs
41-
listenToReplyQueue(queue, correlationId, callback) {
42-
this.receiveMessage(queue, message => {
43-
if (message.properties.correlationId !== correlationId) {
44-
return this.sendNACK(message);
45-
}
46-
this.cancelChannelConsume(message.fields.consumerTag);
47-
this.sendACK(message);
48-
49-
const response = JSON.parse(message.content.toString());
50-
response.type = message.properties.type;
51-
52-
return callback(response);
53-
});
54-
},
55-
56-
// only called from test-suite ./test/queue.mjs
57-
async close() {
58-
await this.channel.close();
59-
await this.connection.close();
60-
},
79+
connection,
80+
channel,
81+
connect,
82+
sendMessage,
83+
cancelChannelConsume,
84+
sendACK,
85+
sendNACK,
86+
receiveMessage,
87+
listenToReplyQueue,
88+
close,
6189
};

0 commit comments

Comments
 (0)