项目作者: mohan-chinnappan-n

项目描述 :
GCP PubSub Samples
高级语言: JavaScript
项目地址: git://github.com/mohan-chinnappan-n/gcp-pubsub.git
创建时间: 2020-03-16T20:58:27Z
项目社区:https://github.com/mohan-chinnappan-n/gcp-pubsub

开源协议:

下载


Sample code for GCP pubsub

Sequence diagram

Option-1

Seq img

  • Code to use GCP pubsub
    ``js const fetch = require("node-fetch"); // code to create case async function createCase(subject, at, priority) { const sobj = 'Case'; // TODO: change the url as required const url =https://mohansun-fsc-21.my.salesforce.com/services/data/v46.0/sobjects/${sobj}`;

    // TODO: design the fields needed for the case creation and get it a part of the message data
    const postbody = { “Subject”: ${subject}, “Priority”: ${priority}} ;

    const params = {

    1. method: "POST",
    2. mode: "cors",
    3. headers: {"Content-Type":"application/json", "Authorization": `Bearer ${at}` },
    4. body: JSON.stringify(postbody)

    };
    try { await fetch(url, params)
    }
    catch (err) {

    1. console.log('Fetch ERROR!' + err);

    }
    }

const subscriptionName = ‘CV19_CASE’;
const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require(‘@google-cloud/pubsub’);

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(Received message ${message.id}:);
console.log(\tData: ${message.data});
console.log(\tAttributes: ${message.attributes});
messageCount += 1;

  1. // CREATE Case here
  2. const cst = await createCase(message.data.subject, message.data.at, message.data.priority);
  3. // "Ack" (acknowledge receipt of) the message
  4. message.ack(message.data.);

};

// Listen for new messages until timeout is hit
subscription.on(‘message’, messageHandler);

setTimeout(() => {
subscription.removeListener(‘message’, messageHandler);
console.log(${messageCount} message(s) received.);
}, timeout * 1000);
}

listenForMessages()

  1. ### Option-2
  2. ![Seq img2](docs/img/seq-2.png)
  3. ### Slides
  4. [SALESFORCE BULK API 2.0 NPM](https://mohan-chinnappan-n2.github.io/2020/bulkapi2/bulkapi2.html#0)
  5. ```js
  6. // filename:index.js
  7. // test file for sfbulk2js
  8. // author: mohan chinnappan (mar-18-2020)
  9. const sfb2 = require('sfbulk2js'); // the npm package we just installed
  10. const fs = require('fs');
  11. const process = require('process');
  12. // read access-token from the env
  13. const AT = process.env.AT;
  14. const cji = {
  15. instanceUrl: 'https://mohansun-fsc-21.my.salesforce.com',
  16. apiVersion: 'v46.0',
  17. accessToken: `${AT}`,
  18. contentType: 'CSV',
  19. lineEnding: 'LF'
  20. };
  21. const waitTimeMs = 5000;
  22. function sleep(ms) {
  23. console.log('WAITING');
  24. return new Promise(resolve => setTimeout(resolve, ms));
  25. }
  26. async function dataload(datafile) {
  27. try {
  28. console.log(`=== CREATE JOB === `);
  29. const job = await sfb2.createJob(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'insert', 'Case', cji.contentType, cji.lineEnding );
  30. console.log(job);
  31. console.log(`jobId: ${job.id}`);
  32. console.log(`=== JOB STATUS === `);
  33. let jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id );
  34. console.log(`=== JOB STATUS for job: ${job.id} ===`);
  35. console.log(jobStatus);
  36. console.log(`=== PUT DATA === `);
  37. const fdata = fs.readFileSync(datafile, 'utf8');
  38. const putDataStatus = await sfb2.putData(cji.instanceUrl, cji.accessToken, job.contentUrl, fdata );
  39. console.log(`=== JOB STATUS === `);
  40. jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id );
  41. console.log(`=== JOB STATUS for job: ${job.id} ===`);
  42. console.log(jobStatus);
  43. console.log(`=== PATCH STATAE === `);
  44. const patchDataStatus = await sfb2.patchState(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id, 'UploadComplete' );
  45. console.log(patchDataStatus);
  46. console.log(`=== JOB STATUS === `);
  47. jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id );
  48. console.log(`=== JOB STATUS for job: ${job.id} ===`);
  49. console.log(jobStatus);
  50. while (jobStatus.state === 'InProgress') { // wait for it
  51. await sleep(waitTimeMs);
  52. jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id );
  53. console.log(jobStatus);
  54. }
  55. console.log(`=== JOB Failure STATUS === `);
  56. jobStatus = await sfb2.getJobFailureStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id );
  57. console.log(`=== JOB Failure STATUS for job: ${job.id} ===`);
  58. console.log(jobStatus);
  59. console.log(`=== JOB getUnprocessedRecords STATUS === `);
  60. jobStatus = await sfb2.getUnprocessedRecords(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id );
  61. console.log(`=== JOB getUnprocessedRecords STATUS for job: ${job.id} ===`);
  62. console.log(jobStatus);
  63. } catch (err) {
  64. console.log(`ERROR in dataload : ${err}`);
  65. }
  66. }
  67. // here we run it
  68. dataload('input.csv');