此文字已使用 Salesforce 的自動翻譯系統進行翻譯。參閱我們的 調查以提供此內容的回饋意見,並告訴我們您接下來想要查看的內容。
Note
概觀
現代 Salesforce 結構越來越多地由非同步處理提供技術支援;這不是為了便利起見,而是作為規模的策略需求。近年來,我們看見越來越多的公司面臨資料量激增、涉及多個接觸點的複雜整合,以及 24/7/365 執行之自發系統的崛起。所有這些項目都會推動結構設計師設計非同步優先的系統。
Salesforce 上的非同步處理通常意味著根據管理員限制和複雜度進行設計。這些限制會作為保護欄和結構限制,協助產生大量安全且可調整的系統。雖然沒有平台限制可直接管理複雜性,但設計模式可協助降低風險。在內部,Salesforce 經常會推動平台的邊界,進一步測試新功能並自動化複雜的業務流程。我們建立了一個以步驟為基礎的非同步處理架構,用於透過任意數量的步驟執行非同步工作。每個步驟都可以透過共用的監管控制項和完整的作業可視性,透過集中化記錄來獨立執行、重試和重新啟動。此文件概述其主要結構元件
重要道具
- 現代 Salesforce 結構應採用非同步優先方法,以達成規模、彈性和營運透明度。
- 將複雜工作拆分為可獨立執行的步驟,可實現可預測的效能、更安全的重試、檢查、回復和模組化演進,而無須重新設計核心工作流程。
- 此架構為單一和過時的批次工作、鏈結非同步呼叫和深度巢狀流程提供可調整的替代方案,並針對在 Salesforce 內必須水平調整且不需要在平台外協調流程的大量工作量所建立。
- 決定性與可觀察的執行可透過集中化記錄與管治來確保進度追蹤、SLA 監視、失敗診斷和稽核層級透明度。
- 針對企業層級的嚴格性設計,包括統一管理、合規性,以及跨長期執行業務流程的分散式州控制。
平台最佳作法
檢閱需求前,以下是使用此類架構的時機注意事項與秘訣。最重要的是,請考量哪個系統是單一事實來源。如果您的 Salesforce 組織最少依賴外部資料,但需要從數百筆到數百萬筆記錄進行調整,請考慮使用步驟型非同步架構。
在下列情況下,請使用此架構:
- 要採取動作的大多數 (或全部) 資訊已存在於您的 CRM 中。
- 維護「解壓縮負載」(ETL) 工作以協調外部資料的預先或進行中成本太高。
- 您必須在設定的排程中延後處理大量 Salesforce 記錄。
- 您可以將處理細分為離散步驟。例如,您可以建立階層式或以樹狀目錄為基礎的記錄集,特別是當資料量在階層或樹狀目錄下拉式時。
在下列情況下**,請勿**使用此架構:
- 建立或更新記錄需要立即重新計算。
- 整合是具有挑戰性的,因為外部系統主控記錄更新的主要資料。(請考慮使用大量 API 將更新的資料推送至 Salesforce。)
考慮到這些作法,讓我們檢閱我們的需求並開始建立。
細分需求
請思考問題陳述式:
若工作需要每日執行,請檢查某些記錄是否符合預先建立的條件,以便進一步處理。若已完成,請開始處理工作。處理記錄可能表示從多個外部系統提取資料以執行計算。工作中的步驟應透過 Slack 通知人員處理的記錄已準備好檢閱。步驟也應根據第一輪通知後的可設定的延遲,將通知升級至角色階層中的經理和上層。
此問題涉及數個不同的步驟,其中某些步驟可獨立進行。有許多方法可以分割工作。以下是一個分組:
- 排程器。
- 處理記錄的步驟介面和實作 (無論處理類型為何)。
- 組織步驟的處理器。
- 排程器呼叫的「Apex 可叫用」。
- 通知件。我們使用 Apex Slack SDK。
- 片語「可設定的延遲」中隱藏一些複雜性。稍���我們會在此文章中檢閱此複雜性。
以下是內建架構的意見圖:
現在,拆分圖表並開始建立部分。
透過已排程流程排程
「排程的流程」為排程機制提供數個優點:
- 「排程的流程」可以封裝並部署為中繼資料。這不適用於透過 Apex (或透過「排程的工作」頁面排程的工作)。
- 對於需要呼叫的架構而言,「等候」元素十分重要。透過在流程中使用,在架構的「可叫用」部分中不需要呼叫。
- 排程細微性符合需求:「排程的流程」的最短間隔為每日。如果您需要更高的頻率 (例如每小時),請針對此需求重新考量「排程流程」。
設定「已排程流程」時的其他考量事項為環境關聯。叫用 Apex 動作前,請新增評估 {!$Api.Enterprise_Server_URL_100} 變數的「決策」元素。這可確保工作只會在預期的環境中執行,例如 UAT 和「生產」。此模式很重要,因為在 SDLC 期間,Sandbox 會經常重新整理或新建立,且如果沒有明確的環境檢查,則「排程的流程」可能會無意中在架構不適用於執行的環境中執行。在「決策」元素中使用 contains 運算子,可讓設定對未來的 Sandbox 建立或 URL 變更具有彈性。
最後,考慮架構應如何捕捉失敗。當流程呼叫任何「動作」時,請一律新增錯誤路徑;例如,您可以將錯誤導向至 Nebula Logger 的「新增記錄項目」動作。Nebula Logger 會將記錄寫入至自訂物件,因此客戶應注意記錄資料會耗用組織儲存空間—依預設,記錄會儲存在組織內 14 天,然後再清除;此保留期間是可設定的。Nebula Logger 也使用「平台事件」發佈記錄,因此記錄項目會與主要資料處理交易獨立儲存,這可確保即使主要的「流程」或 Apex 動作回復,也會抓取失敗。當考慮新增記錄架構時,客戶應評估預期的記錄量與保留需求。
以下是流程的外觀:
讓我們繼續進行 Apex 程式碼的第一部分,且已滿足排程需求。
建立步驟介面
定義 Step 介面:
1public interface Step {
2 void execute();
3 void finalize();
4 String getName();
5 Boolean shouldRestart();
6}針對此文章,step 介面會顯示為外部類別以瞭解。架構本身是彈性的,小組可以使用他們偏好的任何 Apex 封裝模式來組織介面及其實作,只要所有「步驟」類別皆參照相同的介面即可。
我們介面中定義的方法有一些注意事項:
- 雖然目前沒有引數,但當我們傳遞
State類別 (或介面) 以在訂單重要時在步驟之間協調資料時,execute會改善。 getName可以傳回System.Type值,而非String。目標是讓協調流程層能夠記錄步驟名稱,而無須顯示其他內容。
以下是顯示這些組件如何相容的第一個具體實作。稍後只有一個例外,我們建議使用 Queueable Apex 在 Apex 中實作非同步處理;通常不需要批次 Apex (且不建議使用 @future 方法)。Apex 快速啟動,且搭配 Apex 游標,比批次 Apex 有許多優點。
Apex 游標類似實作
Apex 游標提供傳統批次 Apex 模型的現代替代方案。與「批次」處理類似,游標實作可以分部 (每批次最多 2,000) 提取記錄。然而,游標允許在單一交易內進行多個取用,以在大量作業中啟用大幅提升的輸送量。
當採用游標作為此架構的一部分時,小組應注意目前的測試與模擬限制。測試中的游標行為可能與生產行為不同,因此設計避免依賴游標內部的測試策略,並改為在邊界驗證協調流程邏輯十分重要。隨著平台的發展,這些領域將持續改善,但核心指引仍會維持:相較於許多使用個案的批次 Apex,游標提供更高的效能和更少的協調流程負擔。
若要在系統提供的游標與您自己的程式碼之間定義清楚的邊界,我們建議在實作 Step 介面時建立類似游標的表示。請思考一下此程式碼:
1public inherited sharing abstract class CursorStep implements Step {
2 private static final Integer MAX_CHUNK_SIZE = 2000;
3
4 protected Cursor cursor;
5
6 private Integer chunkSize = System.Limits.getLimitDMLRows();
7 private Integer position = 0;
8
9 protected abstract Cursor getCursor();
10 protected abstract void innerExecute(List<SObject> records);
11
12 public abstract String getName();
13
14 public virtual CursorStep withChunkSize(Integer chunkSize) {
15 this.chunkSize = chunkSize;
16 return this;
17 }
18
19 public void execute() {
20 this.cursor = this.cursor ?? this.getCursor();
21 this.cursor.setFetchesPerTransaction(this.getFetchesPerTransaction());
22 List<SObject> records = new List<SObject>();
23 if (this.shouldAdvance()) {
24 records = this.cursor.fetch(this.position, this.chunkSize);
25 this.position += this.chunkSize;
26 }
27 this.innerExecute(records);
28 }
29
30 public virtual void finalize() {
31 Logger.info('finished cursor step for ' + this.getName());
32 }
33
34 public virtual Boolean shouldRestart() {
35 return this.position < this.cursor.getNumRecords();
36 }
37
38 protected virtual Integer getFetchesPerTransaction() {
39 Integer maxRecordsPerFetchCall = 2000;
40 if (this.chunkSize < maxRecordsPerFetchCall) {
41 return this.chunkSize;
42 }
43 // Integer division rounds down
44 // which is perfect for our use-case
45 return this.chunkSize / maxRecordsPerFetchCall;
46 }
47
48 protected virtual Boolean shouldAdvance() {
49 return true;
50 }
51}記下 Cursor 類別。Apex 游標是 Database.Cursor 的例項,但我們的 Cursor 實作可讓我們靈活處理游標的缺點。以下是實作:
1public virtual without sharing class Cursor {
2 private static final Integer MAX_FETCHES_PER_TRANSACTION = Limits.getLimitFetchCallsOnApexCursor();
3
4 @TestVisible
5 private static Integer maxRecordsPerFetchCall = 2000;
6
7 private Integer cursorNumRecords;
8 private Integer fetchesPerTransaction = MAX_FETCHES_PER_TRANSACTION;
9 private final Database.Cursor cursor;
10
11 public Cursor(
12 String finalQuery,
13 Map<String, Object> bindVars,
14 System.AccessLevel accessLevel
15 ) {
16 try {
17 this.cursor = Database.getCursorWithBinds(finalQuery, bindVars, accessLevel);
18 } catch (FatalCursorException e) {
19 Logger.newEntry(
20 System.LoggingLevel.WARN,
21 'Error creating cursor. This can happen if there' +
22 ' are no records returned by the query: ' + e.getMessage()
23 );
24 }
25 }
26
27 public Cursor setFetchesPerTransaction(Integer possibleFetchesPerTransaction) {
28 // Handle accidental round downs from Integer division
29 if (possibleFetchesPerTransaction == 0) {
30 return this;
31 }
32 if (possibleFetchesPerTransaction > MAX_FETCHES_PER_TRANSACTION) {
33 Logger.newEntry(
34 System.LoggingLevel.DEBUG,
35 'Fetches per transaction: ' +
36 possibleFetchesPerTransaction +
37 ' exceeded platform max fetches per transaction: ' +
38 MAX_FETCHES_PER_TRANSACTION +
39 ', defaulting to platform max'
40 );
41 possibleFetchesPerTransaction = MAX_FETCHES_PER_TRANSACTION;
42 }
43 this.fetchesPerTransaction = possibleFetchesPerTransaction;
44 return this;
45 }
46
47 @SuppressWarnings('PMD.EmptyStatementBlock')
48 protected Cursor() {
49 }
50
51 public virtual List<SObject> fetch(Integer start, Integer advanceBy) {
52 if (this.getNumRecords() == 0) {
53 Logger.newEntry(
54 System.LoggingLevel.DEBUG,
55 'Bypassing fetch call, no records to fetch'
56 );
57 return new List<SObject>();
58 }
59 Integer localStart = start;
60 List<SObject> results = new List<SObject>();
61 while (
62 Limits.getFetchCallsOnApexCursor() < this.fetchesPerTransaction &&
63 results.size() < this.getNumRecords() &&
64 localStart < start + advanceBy
65 ) {
66 Integer actualAdvanceBy = this.getAdvanceBy(localStart, advanceBy);
67 results.addAll(this.cursor?.fetch(localStart, actualAdvanceBy) ?? new List<SObject>());
68 localStart += actualAdvanceBy;
69 }
70 return results;
71 }
72
73 public virtual Integer getNumRecords() {
74 this.cursorNumRecords = this.cursorNumRecords ?? this.cursor?.getNumRecords() ?? 0;
75 return this.cursorNumRecords;
76 }
77
78 protected Integer getAdvanceBy(Integer start, Integer advanceBy) {
79 Integer possibleFetchSize = Math.min(advanceBy, this.getNumRecords() - start);
80 if (possibleFetchSize > maxRecordsPerFetchCall) {
81 Logger.newEntry(
82 System.LoggingLevel.DEBUG,
83 'Fetch size: ' +
84 possibleFetchSize +
85 ' exceeded platform max fetch size of ' +
86 maxRecordsPerFetchCall +
87 ', defaulting to max fetch size'
88 );
89 possibleFetchSize = maxRecordsPerFetchCall;
90 } else if (possibleFetchSize < 0) {
91 possibleFetchSize = 0;
92 }
93 return possibleFetchSize;
94 }
95}對於本文的其餘部分,我們會在參照 Apex 類別時省略 sharing 宣告。在實際上,請確保透過或不透過共用明確使用最上層類別,以符合您的物件模型和權限。
另請注意,我們的 Cursor 實作會委派至平台 Database.Cursor,接下來會討論其他優點。
首先,以下是對應的測試:
1@IsTest
2private class CursorTest {
3 @IsTest
4 static void itCapsAdvanceByArgument() {
5 String accountName = 'helloWorld!';
6 insert new Account(Name = accountName);
7 String query = 'SELECT Name FROM Account WHERE Name = :bindVar0';
8 Map<String, Object> bindVars = new Map<String, Object>{ 'bindVar0' => accountName };
9
10 Cursor instance = new Cursor(query, bindVars, System.AccessLevel.SYSTEM_MODE);
11
12 Assert.areEqual(1, instance.getNumRecords());
13 Assert.areEqual(accountName, instance.fetch(0, 1000).get(0).get('Name'));
14 Assert.areEqual(1, System.Limits.getApexCursorRows());
15 }
16
17 @IsTest
18 static void itCapsMaxRecordsPerFetchCall() {
19 Cursor.maxRecordsPerFetchCall = 20;
20 Integer oneMoreThanMaxFetch = Cursor.maxRecordsPerFetchCall + 1;
21
22 List<Account> accounts = new List<Account>();
23 for (Integer i = 0; i < oneMoreThanMaxFetch; i++) {
24 accounts.add(new Account(Name = 'Fetch ' + i));
25 }
26 insert accounts;
27
28 Exception ex;
29 List<SObject> results;
30 Cursor instance = new Cursor(
31 'SELECT Id FROM Account',
32 new Map<String, Object>(),
33 System.AccessLevel.SYSTEM_MODE
34 );
35 try {
36 results = instance.fetch(0, oneMoreThanMaxFetch);
37 } catch (System.InvalidParameterValueException e) {
38 ex = e;
39 }
40
41 Assert.areEqual(null, ex?.getMessage());
42 Assert.areEqual(2, Limits.getFetchCallsOnApexCursor());
43 Assert.areEqual(oneMoreThanMaxFetch, results.size());
44 }
45
46 @IsTest
47 static void itFetchesMultipleTimesPerTransactionWhenMoreThanMaxFetch() {
48 Cursor.maxRecordsPerFetchCall = 20;
49 List<Account> accounts = new List<Account>();
50 Set<String> expectedFetchNames = new Set<String>();
51 for (Integer i = 0; i < Cursor.maxRecordsPerFetchCall + 1; i++) {
52 String accountName = 'Fetch' + i;
53 expectedFetchNames.add(accountName);
54 accounts.add(new Account(Name = accountName));
55 }
56 insert accounts;
57
58 Integer oneMoreThanMaxFetch = Cursor.maxRecordsPerFetchCall + 1;
59 Cursor instance = new Cursor(
60 'SELECT Name FROM Account',
61 new Map<String, Object>(),
62 System.AccessLevel.SYSTEM_MODE
63 );
64 List<SObject> results = instance.setFetchesPerTransaction(2).fetch(0, oneMoreThanMaxFetch);
65
66 Assert.areEqual(Cursor.maxRecordsPerFetchCall + 1, results.size());
67 Assert.areEqual(2, Limits.getFetchCallsOnApexCursor());
68 Set<String> actuallyFetchedNames = new Set<String>();
69 for (Account account : (List<Account>) results) {
70 actuallyFetchedNames.add(account.Name);
71 }
72 Assert.areEqual(expectedFetchNames, actuallyFetchedNames);
73 }
74
75 @IsTest
76 static void itFetchesMultipleTimesPerTransaction() {
77 Cursor.maxRecordsPerFetchCall = 1;
78 insert new List<Account>{ new Account(Name = 'One'), new Account(Name = 'Two') };
79
80 Cursor instance = new Cursor(
81 'SELECT Id FROM Account',
82 new Map<String, Object>(),
83 System.AccessLevel.SYSTEM_MODE
84 )
85 .setFetchesPerTransaction(2);
86 List<SObject> results = instance.fetch(0, 2);
87
88 Assert.areEqual(2, instance.getNumRecords());
89 Assert.areEqual(2, results.size());
90 results = instance.fetch(2, 1);
91 Assert.areEqual(0, results.size());
92 }
93
94 @IsTest
95 static void fetchesCorrectAmountOfRecords() {
96 List<Account> accounts = new List<Account>();
97 for (Integer i = 0; i < 10; i++) {
98 accounts.add(new Account(Name = 'Fetch ' + i));
99 }
100 insert accounts;
101
102 Cursor instance = new Cursor(
103 'SELECT Id FROM Account',
104 new Map<String, Object>(),
105 System.AccessLevel.SYSTEM_MODE
106 )
107 .setFetchesPerTransaction(10);
108 List<SObject> results = instance.fetch(0, 2);
109
110 Assert.areEqual(2, results.size(), '' + results);
111 Assert.areEqual(1, Limits.getFetchCallsOnApexCursor());
112 }
113
114 @IsTest
115 static void doesNotExceedPlatformMaxFetch() {
116 List<Account> accounts = new List<Account>();
117 for (Integer i = 0; i < 101; i++) {
118 accounts.add(new Account(Name = 'Fetch ' + i));
119 }
120 insert accounts;
121
122 Test.startTest();
123 Cursor instance = new Cursor(
124 'SELECT Id FROM Account',
125 new Map<String, Object>(),
126 System.AccessLevel.SYSTEM_MODE
127 )
128 .setFetchesPerTransaction(100);
129 Integer counter = 0;
130 List<SObject> results;
131 while (counter <= 100) {
132 results = instance.fetch(counter, counter + 1);
133 counter++;
134 }
135 Test.stopTest();
136
137 Assert.areEqual(101, counter);
138 Assert.areEqual(0, results.size());
139 }
140}透過將 CursorStep 實作設 Cursor 為虛擬Database.Cursor,當實作不需要重複處理大型記錄集時,可在沒有實作的情況下作業,類似於在批次中傳回 System.Iterable<T> Apex 而非 Apex。Database.QueryLocator範例如下:
1public abstract class CursorLikeImplementation extends CursorStep {
2 private final Cursor cursorLike;
3
4 public CursorLikeImplementation(List<SObject> previouslyRetrievedRecords) {
5 this.cursorLike = new CursorLike(previouslyRetrievedRecords);
6 }
7
8 public override String getName() {
9 return CursorLikeImplementation.class.getName();
10 }
11
12 public override Cursor getCursor() {
13 return this.cursorLike;
14 }
15
16 private class CursorLike extends Cursor {
17 private final List<SObject> records;
18
19 public CursorLike(List<SObject> records) {
20 super();
21 this.records = records;
22 }
23
24 public override List<SObject> fetch(Integer position, Integer chunkSize) {
25 // clone, to keep the underlying list type
26 List<SObject> clonedRecords = this.records.clone();
27 clonedRecords.clear();
28 for (Integer i = position; i < this.getAdvanceBy(position, chunkSize); i++) {
29 clonedRecords.add(this.records[i]);
30 }
31 return clonedRecords;
32 }
33
34 public override Integer getNumRecords() {
35 return this.records.size();
36 }
37 }
38}請注意,由於此類別也是抽象類別,因此會將 innerExecute 的具體實作保留給子類別。
CursorLike 內部子類別也有替代項目。如果您知道類似此步驟的具體版本不會通過其他管理員限制,您可以從 CursorLike.fetch 傳回 this.records,並覆寫父系 CursorStep.shouldRestart() 以傳回 false。這可讓您重複處理僅限於每個非同步交易 12 MB Apex 堆疊限制的清單。
其他可能以步驟為基礎的實作
我們以游標為基礎的實作可讓我們在分頁大量資料時有足夠的彈性。同時,Step介面可讓我們彈性描述和壓縮各種步驟。
請思考流程式步驟:
1public virtual class FlowStep implements Step {
2 private final Invocable.Action specificFlow;
3
4 private Boolean shouldRestart = false;
5
6 public FlowStep(String specificFlowName, Map<String, Object> inputs) {
7 this.specificFlow = Invocable.Action.createCustomAction('flow', specificFlowName);
8 this.specificFlow.setInvocations(new List<Map<String,Object>>{ inputs });
9 }
10
11 public void execute() {
12 List<Invocable.Action.Result> results = this.specificFlow.invoke();
13 for (Invocable.Action.Result result : results) {
14 if (result.isSuccess()) {
15 Map<String, Object> outputParams = result.getOutputParameters();
16 Object potentialShouldRestartValue = outputParams.get('shouldRestart');
17 // Flow does not enforce Booleans being initialized
18 // so a null check is sadly necessary here
19 if (potentialShouldRestartValue != null) {
20 this.shouldRestart = this.shouldRestart ||
21 Boolean.valueOf(potentialShouldRestartValue);
22 }
23 } else {
24 List<String> errorMessages = new List<String>();
25 for (Invocable.Action.Error error : result.getErrors()) {
26 errorMessages.add(
27 'Error code: ' + error.getCode() +
28 ', error message: ' + error.getMessage()
29 );
30 }
31 Logger.error(
32 'An error occurred within your auto-launched flow:\n' +
33 String.join(errorMessages, '\n\t')
34 );
35 }
36 }
37 }
38
39 public virtual void finalize() {
40 Logger.info(this.getName() + ' finished processing');
41 }
42
43 public String getName() {
44 return FlowStep.class.getName() + ':' + this.specificFlow.getName();
45 }
46
47 public Boolean shouldRestart() {
48 return this.shouldRestart;
49 }
50}由於「流程」無法傳回符合 Apex 定義類型的輸出參數,因此在使用前,我們會先檢查是否有 shouldRestart 輸出參數。
某些步驟可能為功能標記。您可以實作邏輯來決定要包含哪些步驟,或針對停用的功能使用 no-op 步驟。空白物件模式是降低協調流程層內複雜性的常見方法:
1@SuppressWarnings('PMD.EmptyStatementBlock')
2public class NoOpStep implements Step {
3 // The null object pattern is commonly implemented
4 // as a singleton to reduce memory consumption
5 public static final NoOpStep SELF {
6 get {
7 SELF = SELF ?? new NoOpStep();
8 }
9 private set;
10 }
11
12 public void execute() {
13 }
14
15 public void finalize() {
16 }
17
18 String getName() {
19 return NoOpStep.class.getName();
20 }
21
22 Boolean shouldRestart() {
23 return false;
24 }
25}我們現在有幾個要處理的建構區塊。讓我們看一下負責逐步解說步驟的協調流程層。
建立步驟處理器
處理器是結構中的轉換點。我們必須決定誰定義要初始化哪些步驟以及何處。選項包括:
- 讓處理器定義哪些步驟會對應至業務邏輯。此選項很簡單,但為了可讀性,其調整程度不佳。
- 使用自訂中繼資料 (CMDT) 定義對應。「中繼資料關係」欄位不支援
ApexClass,這會將類別名稱拼字寬鬆地與您的業務流程設定配對。您可以將欄位設為選項清單,並驗證類型是否存在 (Type.forName()或查詢ApexClass),以降低管理員風險,但由於 CMDT 記錄不支援觸發,因此會在執行階段進行驗證。此路線可測試,但管理員只能在生產環境中建立 CMDT 記錄—請小心進行。 - 定義記錄的對應。非管理員可以設定步驟,但部署會變得較困難,且環境可能會漂移。請謹慎進行。
在 Clean Code 中有關如何處理此特定複雜性的著名引文:
此問題的解決方法是將
switch陳述式 [用於建立物件] 埋藏在抽象工廠的地下室,而不要讓任何人看見它。
考慮到這一點,而且由於我們目前的步驟數量已定義正確,且不太可能成長,因此步驟處理器也可以成為步驟的工廠。這可以使用列舉來驅動切換陳述式:
1public enum StepType {
2 TYPE_ONE,
3 TYPE_TWO,
4 TYPE_THREE,
5 TYPE_FOUR
6 // etc ...
7}然後,針對我們的 StepProcessor:
1public StepProcessor implements System.Queueable, System.Finalizer,
2 Database.AllowsCallouts {
3 private final List<Step> steps = new List<Step>();
4
5 private Step currentStep;
6
7 public StepProcessor setSteps(List<StepType> stepTypes) {
8 for (StepType type : stepTypes) {
9 switch on type {
10 WHEN TYPE_ONE {
11 this.addTypeOneSteps();
12 }
13 WHEN TYPE_TWO {
14 this.addTypeTwoSteps();
15 }
16 // ... etc
17 }
18 }
19 this.cleanSteps();
20 return this;
21 }
22
23 public void execute(System.QueueableContext context) {
24 this.currentStep = this.currentStep ?? this.steps.remove(0);
25 if (context != null) {
26 System.attachFinalizer(this);
27 Logger.setAsyncContext(context);
28 }
29 Logger.info('Executing step ' + this.currentStep.getName());
30 try {
31 this.currentStep.execute();
32 } catch (Exception e) {
33 Logger.exception('Unexpected exception', e);
34 }
35 Logger.info('Finished executing step ' + this.currentStep.getName());
36 Logger.saveLog();
37 }
38
39 public void execute(System.FinalizerContext context) {
40 Logger.info('Executing finalizer for step ' + this.currentStep.getName());
41 Logger.setAsyncContext(context);
42 switch on context?.getResult() {
43 when UNHANDLED_EXCEPTION {
44 // see the note below about this logging paradigm
45 Logger.warn(
46 'Failed to run on step' + this.currentStep,
47 context?.getException()
48 );
49 }
50 when else {
51 this.currentStep.finalize();
52 if (this.currentStep.shouldRestart()) {
53 this.kickoff();
54 } else if (this.steps.isEmpty() == false) {
55 this.currentStep = this.steps.remove(0);
56 this.kickoff();
57 } else {
58 Logger.info('Finished executing steps');
59 }
60 }
61 }
62 Logger.info(
63 'Finished executing finalizer for step ' +
64 this.currentStep.getName()
65 );
66 Logger.saveLog();
67 }
68
69 public String kickoff() {
70 return this.steps.isEmpty() ? null : System.enqueueJob(this);
71 }
72
73 private void cleanSteps() {
74 for (Integer reverseIndex = this.steps.size() - 1;
75 reverseIndex >= 0; reverseIndex--) {
76 if (this.steps[reverseIndex] instanceof NoOpStep) {
77 this.steps.remove(reverseIndex);
78 }
79 }
80 }
81
82 private void addTypeOneSteps() {
83 this.steps.addAll(
84 new List<Step> {
85 new ExampleCursorStepOne(),
86 new ExampleCursorStepTwo()
87 }
88 );
89 }
90
91 private void addTypeTwoSteps() {
92 this.steps.addAll(
93 new List<Step> {
94 new FlowStep('
95 ExampleInvocableName',
96 new Map<String, Object>( 'exampleParameter' => true)
97 ),
98 new ExampleCursorStepThree()
99 }
100 );
101 }
102}顯示的工廠方法 (例如 addTypeOneSteps()) 可以委派如功能標記等疑慮;cleanSteps() 會對收集的步驟執行一次性檢查,以確保在真正非同步化之前沒有任何「空白」步驟。如下所示:
1private Step getStepOrDefault(String customPermissionName, Step defaultStep) {
2 if (System.FeatureManagement.checkPermission(customPermissionName)) {
3 return defaultStep;
4 }
5
6 return NoOpStep.SELF;
7}自在「排程的流程」區段中提及「Nubula Logger」後,我們尚未討論錯誤處理。這是因為 System.Finalizer 可讓我們全面說明所有錯誤狀況的記錄,而無須在每個步驟中新增特定錯誤處理。每個 Step 都專注於執行,同時我們會記錄和取消任何不滿的路徑,使其在單元測試中出現。這支援安全迭代和生產層級警示 (針對所有 WARN 和 ERROR 記錄使用 Slack Logger for Nebula 外掛程式)。
關於錯誤記錄的一個注意事項:將步驟例項傳送至記錄訊息會假設在記錄中會顯示的內容有某種 Trust 層級。Apex 類別的預設 toString() 包含訊息中的所有靜態與例項級內容。這可能是可取的,或者可能會洩露敏感資訊。雖然此處的重點並非記錄和安全性,但請注意,對於某些系統,遵循如 Step 的介面也可能涉及強制覆寫 toString()。
1public interface Step {
2 void execute();
3 void finalize();
4 String getName();
5 Boolean shouldRestart();
6 String toString();
7}這種方法會讓每個物件建立者負責決定允許列印的項目,這可能是可取的。
在記錄層級:在 StepProcessor 層級,我們使用 INFO,即最高的非錯誤層級。當您在應用程式中更精確時,記錄層級應隨之減少。個別步驟可能會使用 DEBUG 作為概況資訊,並將 FINE、FINER 和 FINEST 保留為更詳細的輸出。記錄是一種藝術與科學,但遵循這些原則有助於讓記錄保持一致且實用。
處理步驟處理器內的額外複雜性
繼續之前,讓我們簡要思考一下讓步驟處理器主控步驟用途邏輯的決定。在大型程式碼庫中,請考慮將 StepProcessor 設為虛擬或抽象,並讓子類別識別特定步驟以建立適當的關注分隔。
Apex 可叫用層
排程器最終會叫用 Apex。設定剩餘完成後,「可叫用 Apex」區段可決定應執行的步驟,並將 List<StepType> 傳送至處理器:
1public class DailyJobExecutor {
2 @InvocableMethod(label='Execute Daily Job')
3 public static void executeJob() {
4 Logger.info('Executing daily Job');
5
6 List<StepType> correspondingTypes = new List<StepType>();
7 // based on [business logic], determine which step types
8 // should be included for any daily invocation
9
10 if (correspondingTypes.isEmpty() == false) {
11 try {
12 new StepProcessor().setSteps(correspondingTypes).kickoff();
13 } catch (Exception ex) {
14 Logger.exception('Error starting job', ex);
15 }
16 }
17 }
18
19 Logger.saveLog();
20}這是方程的一個簡單部分,使用記錄、資料或邏輯來決定要執行的步驟類型。「可叫用動作」很簡單,因為我們在其他位置壓縮了複雜性。我們也保護您免受非預期的例外狀況,並且讓每個組件容易進行隔離測試。
處理呼叫 Slack 前的延遲
Apex Slack SDK 超出此文章的範圍,但需要重新查看一個潛在的問題:根據可設定的延遲在角色階層中向上通知人員。在紙本上,這很簡單,您可能會在 StepProcessor 中考量 System.enqueueJob(this)。使用 System.AsyncOptions,我們的初始傾向是使用 enqueueJob 超額來滿足此需求。
不過,目前透過 System.AsyncOptions.MinimumQueueableDelayInMinutes 延遲的上限為 10 分鐘。由於需求為 120 分鐘,因此仍有一些選項。天真的方法可能如下所示:
1public class ExampleDelayedNotifier implements Step {
2 private final List<Slack.ChatPostMessageRequest> notifications = new List<Slack.ChatPostMessageRequest>();
3 private final Slack.BotClient botClient = Slack.App
4 .getAppByKey('some-slack-app-key')
5 .getBotClientForTeam('slack team id');
6
7 // account for the initial delay,
8 // so 120 - 10 = 110
9 private Integer delayMinutes = 110;
10
11 public void execute() {
12 if ( this.delayInMinutes > 0) {
13 return;
14 }
15
16 Integer maximumAllowedCallouts = 100;
17 while (this.notifications.isEmpty() == false && maximumAllowedCallouts > 0) {
18 this.botClient.chatPostMessage(this.notifications.remove(0));
19 maximumAllowedCallouts--;
20 }
21 }
22
23 public void finalize() {
24 this.delayInMinutes -= 10;
25 }
26
27 public String getName() {
28 return ExampleDelayedNotifier.class.getName();
29 }
30
31 public Boolean shouldRestart() {
32 return this.delayInMinutes > 0 || this.notifications.isEmpty() == false;
33 }
34}實際上,延遲會傳遞至此類別,因為延遲是以組態為導向。
除非您確定只會有一個延遲通知類型,否則我們不建議使用此方法。它會在開始前耗用 11 個額外的非同步工作 (如果延遲增加,則會耗用更多工作)。該成本適用於一個工作,而不適用於許多工作。您也需要將方法新增至 Step 介面,讓每個步驟都能告知處理器重新啟動前要等待的時間,進而增加雜訊。
這讓我們有兩個有趣的可能性:
- 如果您已在適當的時間間隔排程輪詢工作,則可以將延遲的步驟插入現有工作架構。您也應確定指定延遲達到 15 分鐘後 (Apex 排程 CRON 運算式的最短重新整理間隔為 15 分鐘)。這與可叫用 Apex 範例大致相符;排程會改為透過 Apex 執行。換句話說,您可以重複使用相同的基於
Step的結構,根據「開始之後」時間戳記處理記錄,並根據選項清單或多重選擇選項清單對應回先前顯示的StepType列舉值來決定要使用的步驟。 - 或者,如果您喜歡定義額外 Apex 類別,請使用
System.scheduleBatch()返回批次 Apex (不同於支援內部類別的 Queueable Apex,批次 Apex 類別必須是外部類別)。
請思考批次 Apex 範例。雖然我們一般建議使用「可排列 Apex」以獲得彈性和控制能力,但這是一種批次 Apex 仍是最高的個案:
1public class DelayedNotifier implements Database.Batchable<Object> {
2 private final StepProcessor processor = new StepProcessor();
3
4 public Iterable<Object> start(Database.BatchableContext bc) {
5 return new List<Object>();
6 }
7
8 @SuppressWarnings('PMD.EmptyStatementBlock')
9 public void execute(Database.BatchableContext bc, Object scope) {
10 // we don't need to actually do anything in execute,
11 // we just need to start up the processor in finish
12 }
13
14 public void finish(Database.BatchableContext bc) {
15 try {
16 // you can imagine Notifier as an elided,
17 // simpler version of the naive implementation
18 // we showed above, now only focused on sending messages
19 this.processor.setSteps(new List<Step>{ new Notifier() }).kickoff();
20 } catch (Exception ex) {
21 Logger.exception('Unexpected error', ex);
22 } finally {
23 Logger.saveLog();
24 }
25 }
26}然後,在 StepProcessor 中,想像一下先前顯示的 addTypeOneSteps() 方法已更新為此延遲步驟:
1public StepProcessor implements System.Queueable, System.Finalizer,
2 Database.AllowsCallouts {
3 // .... unchanged top of class elided
4
5 private void addTypeOneSteps() {
6 this.steps.addAll(
7 new List<Step> {
8 new ExampleCursorStepOne(),
9 new ExampleCursorStepTwo(),
10 new DelayedNotifierStep()
11 }
12 );
13 }
14
15 // ...
16
17 private class DelayedNotifierStep implements Step {
18 private final DelayedNotifier delayedNotifier = new DelayedNotifier();
19 // again — in practice this value would also be passed in
20 private final Integer delayInMinutes = 120;
21
22 public void execute() {
23 System.scheduleBatch(
24 this.delayedNotifier,
25 'Delayed notifier: ' + System.now().getTime(),
26 this.delayInMinutes
27 );
28 }
29
30 public void finalize() {
31 Logger.debug('Nothing to finalize, batch scheduled');
32 }
33
34 public String getName() {
35 return DelayedNotifierStep.class.getName();
36 }
37
38 public Boolean shouldRestart() {
39 return false;
40 }
41 }
42}雖然我們通常不建議這麼多跳轉,但此步驟延遲會變成另一個可重複使用的建構區塊。在 Apex 允許較長的延遲之前,這也代表產生此效果的最簡單方式 (如同所述,不需要輪詢機制)。
結論
我們已使用物件導向設計來滿足需求,並建立了一個可調整規模的系統,同時平衡建立與維護的長期成本。雖然步驟宣告和例項化最終可能會超出其在 StepProcessor 中的位置,但此處沒有額外的技術負債。透過 FlowStep,管理員與開發人員可以共同決定何時無程式碼或 Pro-code 解決方案最有意義。
透過在 Apex 的 Queueable 架構中使用 System.Finalizer 介面,並與 Nebula Logger 搭配使用,我們建立了可測試的強大系統,即使未來的步驟缺乏明確的記錄,也會警示我們發生非預期的失敗。對於我們而言,此系統會成功地縮小數字,並降低成本和複雜性。它也為我們提供了對實際工作負載下 Apex 游標行為的寶貴洞察,協助我們精簡方法,同時改善功能本身。
透過將複雜、大量的工作量拆分成模組化執行步驟,以步驟為基礎的非同步處理架構架構可將平台限制轉換為工程優點,以實現企業規模的可預測效能、可觀察性與管治。步驟可以由管理員和開發人員設定,在任一情況下,步驟作者都可以安全地專注於觀察基本的平台管理員限制 (例如 DML 列,以及檢取的查詢列),而不必擔心如何調整每個步驟。
繼續路徑
若要在企業實作之間作業並採用此模式,結構設計師應:
- 評估現有自動化以識別非同步協調流程可協助改善效能並增強可觀察性的區域。
- 透過明確的處理目標和離散的作者點 (例如流程或 Apex),將大型程序分成離散、獨立執行的步驟。
- 定義並分組步驟類型,以加速跨業務單位的步驟重複使用與標準化。
- 使用新流程或現有自動化來 試用方法。您可能會驚訝地發現您在步驟內可免費找到多少個極端個案、關注您的內建記錄和可觀察性!
關於作者
James Simone 是 Salesforce 的首席軟體工程師,在平台上擁有十多年的經驗。他曾是 Salesforce 客戶和產品擁有者,在轉為開發之前,從 2019 年開始在 Apex 的樂趣中撰寫有關 Salesforce 的技術深入分析。他先前曾在 Salesforce 開發人員部落格以及 Salesforce Engineering 部落格上發佈文章。
8 minute read
