diff --git a/salesforce/bulk.go b/salesforce/bulk.go index c5b4a27..b2e9274 100644 --- a/salesforce/bulk.go +++ b/salesforce/bulk.go @@ -9,20 +9,17 @@ import ( "github.com/gocarina/gocsv" ) -type BulkJob struct { - Object string `json:"object"` - Operation string `json:"operation"` +type BulkJobCreationRequest struct { + Object string `json:"object"` + Operation string `json:"operation"` + ExternalIdFieldName string `json:"externalIdFieldName"` } -type BulkJobCreationResponse struct { +type BulkJob struct { Id string `json:"id"` State string `json:"state"` } -type BulkJobState struct { - State string `json:"state"` -} - type BulkJobData struct { Data string `json:"data"` } @@ -33,7 +30,7 @@ const ( ) func updateJobState(jobId string, state string, auth Auth) error { - abortJob := BulkJobState{State: state} + abortJob := BulkJob{State: state} abortBody, _ := json.Marshal(abortJob) _, abortErr := doRequest("PATCH", "/jobs/ingest/"+jobId, JSONType, auth, string(abortBody)) if abortErr != nil { @@ -42,7 +39,7 @@ func updateJobState(jobId string, state string, auth Auth) error { return nil } -func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) { +func createBulkJob(auth Auth, body []byte) (*BulkJob, error) { resp, err := doRequest("POST", "/jobs/ingest", JSONType, auth, string(body)) if err != nil { return nil, err @@ -56,7 +53,7 @@ func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) { return nil, readErr } - bulkJob := &BulkJobCreationResponse{} + bulkJob := &BulkJob{} jsonError := json.Unmarshal(respBody, bulkJob) if jsonError != nil { return nil, jsonError @@ -65,6 +62,31 @@ func createBulkJob(auth Auth, body []byte) (*BulkJobCreationResponse, error) { return bulkJob, nil } +func uploadJobData(auth Auth, records any, bulkJob BulkJob) error { + sObjects := records + csvContent, csvErr := gocsv.MarshalString(sObjects) + if csvErr != nil { + updateJobState(bulkJob.Id, JobStateAborted, auth) + return csvErr + } + + resp, uploadDataErr := doRequest("PUT", "/jobs/ingest/"+bulkJob.Id+"/batches", CSVType, auth, csvContent) + if uploadDataErr != nil { + updateJobState(bulkJob.Id, JobStateAborted, auth) + return uploadDataErr + } + if resp.StatusCode != http.StatusCreated { + updateJobState(bulkJob.Id, JobStateAborted, auth) + return processSalesforceError(*resp) + } + stateErr := updateJobState(bulkJob.Id, JobStateUploadComplete, auth) + if stateErr != nil { + return stateErr + } + + return nil +} + func (sf *Salesforce) InsertBulk(sObjectName string, records any) error { authErr := validateAuth(*sf) if authErr != nil { @@ -75,7 +97,7 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any) error { return typErr } - job := BulkJob{ + job := BulkJobCreationRequest{ Object: sObjectName, Operation: "insert", } @@ -92,25 +114,115 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any) error { return errors.New("error creating bulk data job. Id does not exist or job closed prematurely") } - sObjects := records - csvContent, csvErr := gocsv.MarshalString(sObjects) - if csvErr != nil { - updateJobState(bulkJob.Id, JobStateAborted, *sf.auth) - return csvErr + uploadErr := uploadJobData(*sf.auth, records, *bulkJob) + if uploadErr != nil { + return uploadErr } - resp, uploadDataErr := doRequest("PUT", "/jobs/ingest/"+bulkJob.Id+"/batches", CSVType, *sf.auth, csvContent) - if uploadDataErr != nil { - updateJobState(bulkJob.Id, JobStateAborted, *sf.auth) + return nil +} + +func (sf *Salesforce) UpdateBulk(sObjectName string, records any) error { + authErr := validateAuth(*sf) + if authErr != nil { + return authErr + } + typErr := validateOfTypeSlice(records) + if typErr != nil { + return typErr + } + + job := BulkJobCreationRequest{ + Object: sObjectName, + Operation: "update", + } + body, jsonError := json.Marshal(job) + if jsonError != nil { + return jsonError + } + + bulkJob, err := createBulkJob(*sf.auth, body) + if err != nil { return err } - if resp.StatusCode != http.StatusCreated { - updateJobState(bulkJob.Id, JobStateAborted, *sf.auth) - return processSalesforceError(*resp) + if bulkJob.Id == "" || bulkJob.State != "Open" { + return errors.New("error creating bulk data job. Id does not exist or job closed prematurely") } - stateErr := updateJobState(bulkJob.Id, JobStateUploadComplete, *sf.auth) - if stateErr != nil { - return stateErr + + uploadErr := uploadJobData(*sf.auth, records, *bulkJob) + if uploadErr != nil { + return uploadErr + } + + return nil +} + +func (sf *Salesforce) UpsertBulk(sObjectName string, fieldName string, records any) error { + authErr := validateAuth(*sf) + if authErr != nil { + return authErr + } + typErr := validateOfTypeSlice(records) + if typErr != nil { + return typErr + } + + job := BulkJobCreationRequest{ + Object: sObjectName, + Operation: "upsert", + ExternalIdFieldName: fieldName, + } + body, jsonError := json.Marshal(job) + if jsonError != nil { + return jsonError + } + + bulkJob, err := createBulkJob(*sf.auth, body) + if err != nil { + return err + } + if bulkJob.Id == "" || bulkJob.State != "Open" { + return errors.New("error creating bulk data job. Id does not exist or job closed prematurely") + } + + uploadErr := uploadJobData(*sf.auth, records, *bulkJob) + if uploadErr != nil { + return uploadErr + } + + return nil +} + +func (sf *Salesforce) DeleteBulk(sObjectName string, records any) error { + authErr := validateAuth(*sf) + if authErr != nil { + return authErr + } + typErr := validateOfTypeSlice(records) + if typErr != nil { + return typErr + } + + job := BulkJobCreationRequest{ + Object: sObjectName, + Operation: "delete", + } + body, jsonError := json.Marshal(job) + if jsonError != nil { + return jsonError + } + + bulkJob, err := createBulkJob(*sf.auth, body) + if err != nil { + return err + } + if bulkJob.Id == "" || bulkJob.State != "Open" { + return errors.New("error creating bulk data job. Id does not exist or job closed prematurely") + } + + uploadErr := uploadJobData(*sf.auth, records, *bulkJob) + if uploadErr != nil { + return uploadErr } return nil