-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathZKAFKA_ERROR_RECORDS.txt
264 lines (224 loc) · 11.6 KB
/
ZKAFKA_ERROR_RECORDS.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
* Copyright 2018-2020 Cargill Incorporated
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*&---------------------------------------------------------------------*
*& Report ZKAFKA_ERROR_RECORDS
*&---------------------------------------------------------------------*
REPORT zkafka_error_records.
DATA lv_count TYPE i.
DATA lv_tablename TYPE tabname.
FIELD-SYMBOLS: <any_tab> TYPE ANY TABLE.
FIELD-SYMBOLS <fs_record> TYPE any.
FIELD-SYMBOLS <lv_operation> TYPE any.
FIELD-SYMBOLS <lv_change_flag> TYPE char1.
FIELD-SYMBOLS <lv_update_ts> TYPE timestamp.
FIELD-SYMBOLS <lv_recordno> TYPE any.
FIELD-SYMBOLS <fs_record_src> TYPE any.
FIELD-SYMBOLS <ls_staging> TYPE zkafka_error.
TYPES: BEGIN OF typ_error_tables,
mt_id TYPE zkafka_error-mt_id,
tablename TYPE zkafka_error-tablename,
END OF typ_error_tables.
CONSTANTS lc_content_type TYPE string VALUE 'application/vnd.kafka.json.v2+json'.
CONSTANTS lc_kafka_variant TYPE rvari_vnam VALUE 'ZBI_KAFKA_SERVER'.
DATA lv_kafka_url TYPE string.
DATA lv_kafka_url_topic TYPE string..
DATA lv_response_string TYPE string.
DATA lv_return_code TYPE i.
DATA lv_reason TYPE string.
DATA lo_json TYPE REF TO /ui2/cl_json_serializer.
DATA lv_json TYPE string.
DATA lv_json_final TYPE string.
DATA lv_json_record TYPE string.
DATA lv_json_records TYPE string.
DATA lo_data TYPE REF TO data.
DATA lv_structure_name TYPE char50.
DATA lv_topic TYPE string.
DATA lv_topic_prefix TYPE string.
DATA lo_client TYPE REF TO if_http_client.
DATA lo_staging TYPE REF TO zcl_kafka.
DATA lv_tabname TYPE tabname.
DATA lv_table_string TYPE c LENGTH 100.
DATA lv_recordno TYPE int4.
DATA lv_timestamp TYPE timestampl.
DATA lv_start_ts TYPE timestampl.
DATA lv_end_ts TYPE timestampl.
DATA lv_batch_ts TYPE timestampl.
DATA lv_batch_end_ts TYPE timestampl.
DATA lv_runtime TYPE tzntstmpl.
DATA ls_staging TYPE zkafka_error.
DATA lt_staging TYPE STANDARD TABLE OF zkafka_error.
DATA lv_guid TYPE sysuuid_x16.
DATA lv_jobcount TYPE tbtcjob-jobcount.
DATA lv_jobname TYPE tbtcjob-jobname.
DATA lv_index TYPE i.
DATA lv_batch_rec_count TYPE i. "send 20 records
DATA lv_batch_count TYPE i. "send 20 records
DATA lv_batch_size TYPE i VALUE 1000.
DATA lv_total TYPE i.
DATA lv_success TYPE i.
DATA lv_message TYPE char255.
DATA lv_retry TYPE c.
DATA lv_retry_attempts TYPE i.
DATA lv_log_ts TYPE timestampl.
DATA ls_logging TYPE zkafka_logging.
DATA ls_error TYPE zkafka_error.
DATA lv_error_occurred TYPE c.
DATA lv_error_count TYPE i.
DATA lt_error_tables TYPE STANDARD TABLE OF typ_error_tables.
DATA lwa_error_tables TYPE typ_error_tables.
FIELD-SYMBOLS <lv_belnr> TYPE any.
DATA lv_total_string TYPE c LENGTH 10.
DATA lv_run_guid TYPE sysuuid_x16.
DATA lv_query_limit TYPE i VALUE 10000000.
PARAMETERS p_mt TYPE dmc_mt_header-id.
PARAMETERS p_tbl TYPE tabname .
PARAMETERS p_batch TYPE i.
PARAMETERS p_query TYPE i.
IF p_batch <> 0 AND p_batch <> ''.
lv_batch_size = p_batch.
ENDIF.
IF p_query <> 0 AND p_query <> ''.
lv_query_limit = p_query.
ENDIF.
CLEAR: lv_error_count, lv_error_occurred, lt_staging, lv_batch_rec_count, lv_batch_count, lv_return_code.
* Section of the code that loads any of the Error Records to lt_staging
IF p_tbl <> ''.
SELECT DISTINCT mt_id tablename FROM zkafka_error INTO TABLE lt_error_tables WHERE mt_id = p_mt AND tablename = p_tbl.
ELSE.
SELECT DISTINCT mt_id tablename FROM zkafka_error INTO TABLE lt_error_tables.
ENDIF.
LOOP AT lt_error_tables INTO lwa_error_tables.
WRITE: / 'Reprocessing MT ID: ', lwa_error_tables-mt_id, ' table: ', lwa_error_tables-tablename.
lv_message = |Reprocessing MT ID: { lwa_error_tables-mt_id ALIGN = LEFT } tabl: { lwa_error_tables-tablename ALIGN = LEFT }|.
"character_string = |The length of text element 001 ({ text–001 }) is { strlen( text–001 ) }|.
MESSAGE lv_message TYPE 'S'.
SELECT mandt entryid mt_id tablename topicname returncode message entrydata createdon recno createdonstring
FROM zkafka_error UP TO lv_query_limit ROWS INTO TABLE lt_staging WHERE mt_id = lwa_error_tables-mt_id AND tablename = lwa_error_tables-tablename
ORDER BY createdon recno.
SORT lt_staging BY createdon recno.
lv_error_count = lines( lt_staging ).
GET TIME STAMP FIELD lv_start_ts.
SELECT SINGLE low INTO lv_kafka_url FROM tvarvc WHERE name = lc_kafka_variant AND type = 'P' AND numb = space.
IF sy-subrc EQ 0 AND lv_kafka_url IS NOT INITIAL.
lv_total = lines( lt_staging ).
WRITE: / 'Attempting to send a total of ', lv_total, ' records to Kafka at', lv_start_ts , ' for Configuration ', lwa_error_tables-mt_id, ' and table ', lwa_error_tables-tablename.
lv_message = |Attempting to send a total of: { lv_total ALIGN = LEFT } records to Kafka at: { lv_start_ts ALIGN = LEFT }|.
"character_string = |The length of text element 001 ({ text–001 }) is { strlen( text–001 ) }|.
MESSAGE lv_message TYPE 'S'.
CLEAR lv_total.
LOOP AT lt_staging ASSIGNING <ls_staging>.
IF lv_error_occurred <> 'X'.
ADD 1 TO lv_total.
CLEAR lv_kafka_url_topic.
SELECT topicname INTO lv_topic FROM zkafka_tbl_topic WHERE mt_id = <ls_staging>-mt_id AND tablename = <ls_staging>-tablename.
ENDSELECT.
IF lv_topic IS INITIAL.
lv_topic = 'slt.no_topic'.
ENDIF.
ADD 1 TO lv_batch_rec_count.
CLEAR lv_json_record.
CONCATENATE '{"value":' <ls_staging>-entrydata '}' INTO lv_json_record."convert each entry into kafka format
IF lv_json_records IS NOT INITIAL.
CONCATENATE lv_json_records lv_json_record INTO lv_json_records SEPARATED BY ','."create entry strings
ELSE.
lv_json_records = lv_json_record."first entry
ENDIF.
*
* Either batch size is reached or table end has been reached
IF lv_batch_rec_count GE lv_batch_size OR lv_total EQ lines( lt_staging ).
ADD 1 TO lv_batch_count.
CONCATENATE '{"records":[' lv_json_records ']}' INTO lv_json_final."converting the json payload into kafka's internal format
IF lv_topic IS NOT INITIAL.
CONCATENATE lv_kafka_url '/' lv_topic '/' INTO lv_kafka_url_topic.
ELSE.
lv_kafka_url_topic = lv_kafka_url.
ENDIF.
* Call the KAFKA API. If it fails try again every 30 seconds for an hour and then every 5 minutes after that
* lv_retry = 'X'.
* lv_retry_attempts = 0.
* WHILE lv_retry = 'X'.
TRANSLATE lv_kafka_url_topic TO LOWER CASE.
CALL METHOD cl_http_client=>create_by_url
EXPORTING
url = lv_kafka_url_topic
IMPORTING
client = lo_client.
"REPLACE ALL OCCURRENCES OF REGEX ':<.' IN lv_json_final WITH ':0.'.
REPLACE ALL OCCURRENCES OF REGEX '":<' IN lv_json_final WITH '":'.
REPLACE ALL OCCURRENCES OF REGEX '\/' IN lv_json_final WITH '/'.
lo_client->request->set_method( method = if_http_request=>co_request_method_post ).
lo_client->request->set_content_type( content_type = lc_content_type ).
lo_client->request->set_cdata( lv_json_final ).
GET TIME STAMP FIELD lv_batch_ts.
lv_message = |Topic: { lv_topic ALIGN = LEFT } Sending batch - { lv_batch_count ALIGN = LEFT } - with { lv_batch_rec_count ALIGN = LEFT } record(s) at { lv_batch_ts ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
lo_client->send(
EXCEPTIONS
http_communication_failure = 1
http_invalid_state = 2
).
IF sy-subrc <> 0.
lv_message = |Send Failed - { sy-subrc ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ENDIF.
lo_client->receive(
EXCEPTIONS
http_communication_failure = 1
http_invalid_state = 2
http_processing_failed = 3
).
IF sy-subrc <> 0.
lv_message = |Receive Failed - { sy-subrc ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ENDIF.
CLEAR: lv_return_code, lv_reason.
lv_response_string = lo_client->response->get_cdata( ).
lo_client->response->get_status( IMPORTING code = lv_return_code reason = lv_reason ).
lo_client->close( ).
IF lv_return_code = 200."delete the entries
IF lv_error_count > 0.
DELETE FROM zkafka_error WHERE mt_id = lwa_error_tables-mt_id AND tablename = lwa_error_tables-tablename
AND ( createdonstring < <ls_staging>-createdonstring OR ( createdonstring = <ls_staging>-createdonstring AND recno <= <ls_staging>-recno ) ).
COMMIT WORK AND WAIT.
ENDIF.
ADD lv_batch_rec_count TO lv_success.
lv_message = |Total Records sent to Kafka in sub-batch: { lv_batch_rec_count ALIGN = LEFT }. Return Code: { lv_return_code ALIGN = LEFT }. Reason: { lv_reason ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
WRITE: / 'Total Records sent to Kafka in sub-batch: ', lv_batch_rec_count, ' Return Code: ', lv_return_code , ' Reason: ', lv_reason .
ELSE."reset the entries
lv_message = |Batch - { lv_batch_count ALIGN = LEFT } - ended with return code: { lv_return_code ALIGN = LEFT } and message: { lv_reason ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
WRITE: / 'Batch - ', lv_batch_count, ' - ended with return code: ', lv_return_code , ' and message: ', lv_reason.
<ls_staging>-returncode = lv_return_code.
<ls_staging>-message = lv_reason.
DELETE lt_staging WHERE mt_id = lwa_error_tables-mt_id AND tablename = lwa_error_tables-tablename
AND ( createdonstring < <ls_staging>-createdonstring OR ( createdonstring = <ls_staging>-createdonstring AND recno <= <ls_staging>-recno ) ).
MODIFY zkafka_error FROM TABLE lt_staging.
COMMIT WORK AND WAIT.
lv_error_occurred = 'X'.
ENDIF.
GET TIME STAMP FIELD lv_batch_end_ts.
lv_runtime = cl_abap_tstmp=>subtract( tstmp1 = lv_batch_ts tstmp2 = lv_batch_end_ts ).
lv_message = |Batch - { lv_batch_count ALIGN = LEFT } - processing time: { lv_runtime ALIGN = LEFT }|.
CLEAR lv_batch_rec_count.
CLEAR lv_json_records.
ENDIF.
ENDIF. " Checking whether an error occurred sending to Kafka
ENDLOOP.
GET TIME STAMP FIELD lv_end_ts.
CLEAR lv_runtime.
lv_runtime = cl_abap_tstmp=>subtract( tstmp1 = lv_end_ts tstmp2 = lv_start_ts ).
lv_message = |Total Records Processed: { lv_total ALIGN = LEFT }. Successfully Posted: { lv_success ALIGN = LEFT }. Total Runtime(secs): { lv_runtime ALIGN = LEFT DECIMALS = 0 } |.
MESSAGE lv_message TYPE 'S'.
ENDIF.
ENDLOOP.