Skip to content

Commit

Permalink
add bulk update, upsert, and delete
Browse files Browse the repository at this point in the history
  • Loading branch information
k-capehart committed Apr 17, 2024
1 parent 408c759 commit 5cd1291
Showing 1 changed file with 138 additions and 26 deletions.
164 changes: 138 additions & 26 deletions salesforce/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@ import (
"github.com/gocarina/gocsv"
)

type BulkJob struct {
Object string `json:"object"`
Operation string `json:"operation"`
type BulkJobCreationRequest struct {
Object string `json:"object"`
Operation string `json:"operation"`
ExternalIdFieldName string `json:"externalIdFieldName"`
}

type BulkJobCreationResponse struct {
type BulkJob struct {
Id string `json:"id"`
State string `json:"state"`
}

type BulkJobState struct {
State string `json:"state"`
}

type BulkJobData struct {
Data string `json:"data"`
}
Expand All @@ -33,7 +30,7 @@ const (
)

func updateJobState(jobId string, state string, auth Auth) error {
abortJob := BulkJobState{State: state}
abortJob := BulkJob{State: state}
abortBody, _ := json.Marshal(abortJob)
_, abortErr := doRequest("PATCH", "/jobs/ingest/"+jobId, JSONType, auth, string(abortBody))
if abortErr != nil {
Expand All @@ -42,7 +39,7 @@ func updateJobState(jobId string, state string, auth Auth) error {
return nil
}

func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) {
func createBulkJob(auth Auth, body []byte) (*BulkJob, error) {
resp, err := doRequest("POST", "/jobs/ingest", JSONType, auth, string(body))
if err != nil {
return nil, err
Expand All @@ -56,7 +53,7 @@ func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) {
return nil, readErr
}

bulkJob := &BulkJobCreationResponse{}
bulkJob := &BulkJob{}
jsonError := json.Unmarshal(respBody, bulkJob)
if jsonError != nil {
return nil, jsonError
Expand All @@ -65,6 +62,31 @@ func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) {
return bulkJob, nil
}

func uploadJobData(auth Auth, records any, bulkJob BulkJob) error {
sObjects := records
csvContent, csvErr := gocsv.MarshalString(sObjects)
if csvErr != nil {
updateJobState(bulkJob.Id, JobStateAborted, auth)
return csvErr
}

resp, uploadDataErr := doRequest("PUT", "/jobs/ingest/"+bulkJob.Id+"/batches", CSVType, auth, csvContent)
if uploadDataErr != nil {
updateJobState(bulkJob.Id, JobStateAborted, auth)
return uploadDataErr
}
if resp.StatusCode != http.StatusCreated {
updateJobState(bulkJob.Id, JobStateAborted, auth)
return processSalesforceError(*resp)
}
stateErr := updateJobState(bulkJob.Id, JobStateUploadComplete, auth)
if stateErr != nil {
return stateErr
}

return nil
}

func (sf *Salesforce) InsertBulk(sObjectName string, records any) error {
authErr := validateAuth(*sf)
if authErr != nil {
Expand All @@ -75,7 +97,7 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any) error {
return typErr
}

job := BulkJob{
job := BulkJobCreationRequest{
Object: sObjectName,
Operation: "insert",
}
Expand All @@ -92,25 +114,115 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any) error {
return errors.New("error creating bulk data job. Id does not exist or job closed prematurely")
}

sObjects := records
csvContent, csvErr := gocsv.MarshalString(sObjects)
if csvErr != nil {
updateJobState(bulkJob.Id, JobStateAborted, *sf.auth)
return csvErr
uploadErr := uploadJobData(*sf.auth, records, *bulkJob)
if uploadErr != nil {
return uploadErr
}

resp, uploadDataErr := doRequest("PUT", "/jobs/ingest/"+bulkJob.Id+"/batches", CSVType, *sf.auth, csvContent)
if uploadDataErr != nil {
updateJobState(bulkJob.Id, JobStateAborted, *sf.auth)
return nil
}

func (sf *Salesforce) UpdateBulk(sObjectName string, records any) error {
authErr := validateAuth(*sf)
if authErr != nil {
return authErr
}
typErr := validateOfTypeSlice(records)
if typErr != nil {
return typErr
}

job := BulkJobCreationRequest{
Object: sObjectName,
Operation: "update",
}
body, jsonError := json.Marshal(job)
if jsonError != nil {
return jsonError
}

bulkJob, err := createBulkJob(*sf.auth, body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusCreated {
updateJobState(bulkJob.Id, JobStateAborted, *sf.auth)
return processSalesforceError(*resp)
if bulkJob.Id == "" || bulkJob.State != "Open" {
return errors.New("error creating bulk data job. Id does not exist or job closed prematurely")
}
stateErr := updateJobState(bulkJob.Id, JobStateUploadComplete, *sf.auth)
if stateErr != nil {
return stateErr

uploadErr := uploadJobData(*sf.auth, records, *bulkJob)
if uploadErr != nil {
return uploadErr
}

return nil
}

func (sf *Salesforce) UpsertBulk(sObjectName string, fieldName string, records any) error {
authErr := validateAuth(*sf)
if authErr != nil {
return authErr
}
typErr := validateOfTypeSlice(records)
if typErr != nil {
return typErr
}

job := BulkJobCreationRequest{
Object: sObjectName,
Operation: "upsert",
ExternalIdFieldName: fieldName,
}
body, jsonError := json.Marshal(job)
if jsonError != nil {
return jsonError
}

bulkJob, err := createBulkJob(*sf.auth, body)
if err != nil {
return err
}
if bulkJob.Id == "" || bulkJob.State != "Open" {
return errors.New("error creating bulk data job. Id does not exist or job closed prematurely")
}

uploadErr := uploadJobData(*sf.auth, records, *bulkJob)
if uploadErr != nil {
return uploadErr
}

return nil
}

func (sf *Salesforce) DeleteBulk(sObjectName string, records any) error {
authErr := validateAuth(*sf)
if authErr != nil {
return authErr
}
typErr := validateOfTypeSlice(records)
if typErr != nil {
return typErr
}

job := BulkJobCreationRequest{
Object: sObjectName,
Operation: "delete",
}
body, jsonError := json.Marshal(job)
if jsonError != nil {
return jsonError
}

bulkJob, err := createBulkJob(*sf.auth, body)
if err != nil {
return err
}
if bulkJob.Id == "" || bulkJob.State != "Open" {
return errors.New("error creating bulk data job. Id does not exist or job closed prematurely")
}

uploadErr := uploadJobData(*sf.auth, records, *bulkJob)
if uploadErr != nil {
return uploadErr
}

return nil
Expand Down

0 comments on commit 5cd1291

Please sign in to comment.