60
60
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .VERSION ;
61
61
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .cacheOperation ;
62
62
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .cacheRecordSize ;
63
- import static org .apache .ignite .internal .processors .performancestatistics .OperationType .cacheStartRecordSize ;
64
63
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .checkpointRecordSize ;
65
64
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .jobRecordSize ;
66
65
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .pagesWriteThrottleRecordSize ;
67
66
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .queryReadsRecordSize ;
68
- import static org .apache .ignite .internal .processors .performancestatistics .OperationType .queryRecordSize ;
69
- import static org .apache .ignite .internal .processors .performancestatistics .OperationType .taskRecordSize ;
67
+ import static org .apache .ignite .internal .processors .performancestatistics .OperationType .readCacheStartRecordSize ;
68
+ import static org .apache .ignite .internal .processors .performancestatistics .OperationType .readQueryPropertyRecordSize ;
69
+ import static org .apache .ignite .internal .processors .performancestatistics .OperationType .readQueryRecordSize ;
70
+ import static org .apache .ignite .internal .processors .performancestatistics .OperationType .readQueryRowsRecordSize ;
71
+ import static org .apache .ignite .internal .processors .performancestatistics .OperationType .readTaskRecordSize ;
70
72
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .transactionOperation ;
71
73
import static org .apache .ignite .internal .processors .performancestatistics .OperationType .transactionRecordSize ;
72
74
@@ -267,48 +269,21 @@ else if (transactionOperation(opType)) {
267
269
return true ;
268
270
}
269
271
else if (opType == QUERY ) {
270
- if (buf .remaining () < 1 )
271
- return false ;
272
-
273
- boolean cached = buf .get () != 0 ;
274
-
275
- String text ;
276
- int hash = 0 ;
277
-
278
- if (cached ) {
279
- if (buf .remaining () < 4 )
280
- return false ;
281
-
282
- hash = buf .getInt ();
283
-
284
- text = knownStrs .get (hash );
285
-
286
- if (buf .remaining () < queryRecordSize (0 , true ) - 1 - 4 )
287
- return false ;
288
- }
289
- else {
290
- if (buf .remaining () < 4 )
291
- return false ;
292
-
293
- int textLen = buf .getInt ();
294
-
295
- if (buf .remaining () < queryRecordSize (textLen , false ) - 1 - 4 )
296
- return false ;
272
+ ForwardableString text = readString (buf );
297
273
298
- text = readString ( buf , textLen );
299
- }
274
+ if ( text == null || buf . remaining () < readQueryRecordSize ())
275
+ return false ;
300
276
301
277
GridCacheQueryType qryType = GridCacheQueryType .fromOrdinal (buf .get ());
302
278
long id = buf .getLong ();
303
279
long startTime = buf .getLong ();
304
280
long duration = buf .getLong ();
305
281
boolean success = buf .get () != 0 ;
306
282
307
- if (text == null )
308
- forwardRead (hash );
283
+ forwardRead (text );
309
284
310
285
for (PerformanceStatisticsHandler hnd : curHnd )
311
- hnd .query (nodeId , qryType , text , id , startTime , duration , success );
286
+ hnd .query (nodeId , qryType , text . str , id , startTime , duration , success );
312
287
313
288
return true ;
314
289
}
@@ -328,86 +303,64 @@ else if (opType == QUERY_READS) {
328
303
return true ;
329
304
}
330
305
else if (opType == QUERY_ROWS ) {
331
- String action = readCacheableString (buf );
306
+ ForwardableString action = readString (buf );
332
307
333
- if (action == null || buf .remaining () < 1 + 16 + 8 + 8 )
308
+ if (action == null || buf .remaining () < readQueryRowsRecordSize () )
334
309
return false ;
335
310
336
311
GridCacheQueryType qryType = GridCacheQueryType .fromOrdinal (buf .get ());
337
312
UUID uuid = readUuid (buf );
338
313
long id = buf .getLong ();
339
314
long rows = buf .getLong ();
340
315
316
+ forwardRead (action );
317
+
341
318
for (PerformanceStatisticsHandler hnd : curHnd )
342
- hnd .queryRows (nodeId , qryType , uuid , id , action , rows );
319
+ hnd .queryRows (nodeId , qryType , uuid , id , action . str , rows );
343
320
344
321
return true ;
345
322
}
346
323
else if (opType == QUERY_PROPERTY ) {
347
- String name = readCacheableString (buf );
324
+ ForwardableString name = readString (buf );
348
325
349
326
if (name == null )
350
327
return false ;
351
328
352
- String val = readCacheableString (buf );
329
+ ForwardableString val = readString (buf );
353
330
354
331
if (val == null )
355
332
return false ;
356
333
357
- if (buf .remaining () < 1 + 16 + 8 )
334
+ if (buf .remaining () < readQueryPropertyRecordSize () )
358
335
return false ;
359
336
360
337
GridCacheQueryType qryType = GridCacheQueryType .fromOrdinal (buf .get ());
361
338
UUID uuid = readUuid (buf );
362
339
long id = buf .getLong ();
363
340
341
+ forwardRead (name );
342
+ forwardRead (val );
343
+
364
344
for (PerformanceStatisticsHandler hnd : curHnd )
365
- hnd .queryProperty (nodeId , qryType , uuid , id , name , val );
345
+ hnd .queryProperty (nodeId , qryType , uuid , id , name . str , val . str );
366
346
367
347
return true ;
368
348
}
369
349
else if (opType == TASK ) {
370
- if (buf .remaining () < 1 )
371
- return false ;
372
-
373
- boolean cached = buf .get () != 0 ;
374
-
375
- String taskName ;
376
- int hash = 0 ;
377
-
378
- if (cached ) {
379
- if (buf .remaining () < 4 )
380
- return false ;
381
-
382
- hash = buf .getInt ();
350
+ ForwardableString taskName = readString (buf );
383
351
384
- taskName = knownStrs .get (hash );
385
-
386
- if (buf .remaining () < taskRecordSize (0 , true ) - 1 - 4 )
387
- return false ;
388
- }
389
- else {
390
- if (buf .remaining () < 4 )
391
- return false ;
392
-
393
- int nameLen = buf .getInt ();
394
-
395
- if (buf .remaining () < taskRecordSize (nameLen , false ) - 1 - 4 )
396
- return false ;
397
-
398
- taskName = readString (buf , nameLen );
399
- }
352
+ if (taskName == null || buf .remaining () < readTaskRecordSize ())
353
+ return false ;
400
354
401
355
IgniteUuid sesId = readIgniteUuid (buf );
402
356
long startTime = buf .getLong ();
403
357
long duration = buf .getLong ();
404
358
int affPartId = buf .getInt ();
405
359
406
- if (taskName == null )
407
- forwardRead (hash );
360
+ forwardRead (taskName );
408
361
409
362
for (PerformanceStatisticsHandler hnd : curHnd )
410
- hnd .task (nodeId , sesId , taskName , startTime , duration , affPartId );
363
+ hnd .task (nodeId , sesId , taskName . str , startTime , duration , affPartId );
411
364
412
365
return true ;
413
366
}
@@ -427,41 +380,17 @@ else if (opType == JOB) {
427
380
return true ;
428
381
}
429
382
else if (opType == CACHE_START ) {
430
- if (buf .remaining () < 1 )
431
- return false ;
432
-
433
- boolean cached = buf .get () != 0 ;
383
+ ForwardableString cacheName = readString (buf );
434
384
435
- String cacheName ;
436
- int hash = 0 ;
437
-
438
- if (cached ) {
439
- if (buf .remaining () < 4 )
440
- return false ;
441
-
442
- hash = buf .getInt ();
443
-
444
- cacheName = knownStrs .get (hash );
445
-
446
- if (buf .remaining () < cacheStartRecordSize (0 , true ) - 1 - 4 )
447
- return false ;
448
- }
449
- else {
450
- if (buf .remaining () < 4 )
451
- return false ;
452
-
453
- int nameLen = buf .getInt ();
454
-
455
- if (buf .remaining () < cacheStartRecordSize (nameLen , false ) - 1 - 4 )
456
- return false ;
457
-
458
- cacheName = readString (buf , nameLen );
459
- }
385
+ if (cacheName == null || buf .remaining () < readCacheStartRecordSize ())
386
+ return false ;
460
387
461
388
int cacheId = buf .getInt ();
462
389
390
+ forwardRead (cacheName );
391
+
463
392
for (PerformanceStatisticsHandler hnd : curHnd )
464
- hnd .cacheStart (nodeId , cacheId , cacheName );
393
+ hnd .cacheStart (nodeId , cacheId , cacheName . str );
465
394
466
395
return true ;
467
396
}
@@ -524,8 +453,14 @@ else if (opType == PAGES_WRITE_THROTTLE) {
524
453
throw new IgniteException ("Unknown operation type id [typeId=" + opTypeByte + ']' );
525
454
}
526
455
527
- /** Turns on forward read mode. */
528
- private void forwardRead (int hash ) throws IOException {
456
+ /**
457
+ * Enables forward read mode when {@link ForwardableString#str} is null.
458
+ * @see ForwardableString
459
+ */
460
+ private void forwardRead (ForwardableString forwardableStr ) throws IOException {
461
+ if (forwardableStr .str != null )
462
+ return ;
463
+
529
464
if (forwardRead != null )
530
465
return ;
531
466
@@ -543,7 +478,7 @@ private void forwardRead(int hash) throws IOException {
543
478
544
479
curHnd = NOOP_HANDLER ;
545
480
546
- forwardRead = new ForwardRead (hash , curRecPos , nextRecPos , bufPos );
481
+ forwardRead = new ForwardRead (forwardableStr . hash , curRecPos , nextRecPos , bufPos );
547
482
}
548
483
549
484
/** Resolves performance statistics files. */
@@ -585,9 +520,31 @@ static List<File> resolveFiles(List<File> filesOrDirs) throws IOException {
585
520
return null ;
586
521
}
587
522
588
- /** Reads string from byte buffer. */
589
- private String readString (ByteBuffer buf , int size ) {
590
- byte [] bytes = new byte [size ];
523
+ /**
524
+ * Reads cacheable string from byte buffer.
525
+ *
526
+ * @return {@link ForwardableString} with result of reading or {@code null} in case of buffer underflow.
527
+ */
528
+ private ForwardableString readString (ByteBuffer buf ) {
529
+ if (buf .remaining () < 1 + 4 )
530
+ return null ;
531
+
532
+ boolean cached = buf .get () != 0 ;
533
+
534
+ if (cached ) {
535
+ int hash = buf .getInt ();
536
+
537
+ String str = knownStrs .get (hash );
538
+
539
+ return new ForwardableString (str , hash );
540
+ }
541
+
542
+ int textLen = buf .getInt ();
543
+
544
+ if (buf .remaining () < textLen )
545
+ return null ;
546
+
547
+ byte [] bytes = new byte [textLen ];
591
548
592
549
buf .get (bytes );
593
550
@@ -598,32 +555,24 @@ private String readString(ByteBuffer buf, int size) {
598
555
if (forwardRead != null && forwardRead .hash == str .hashCode ())
599
556
forwardRead .found = true ;
600
557
601
- return str ;
558
+ return new ForwardableString ( str , str . hashCode ()) ;
602
559
}
603
560
604
561
/**
605
- * Reads cacheable string from byte buffer.
606
- *
607
- * @return String or {@code null} in case of buffer underflow.
562
+ * Result of reading string from buffer that may be cached.
563
+ * Call {@link #forwardRead(ForwardableString)} after reading the entire record to enable forward read mode.
608
564
*/
609
- private String readCacheableString ( ByteBuffer buf ) {
610
- if ( buf . remaining () < 1 + 4 )
611
- return null ;
565
+ private static class ForwardableString {
566
+ /** Can be {@code null} if the string is cached and there is no such {@link #hash} in {@link #knownStrs}. */
567
+ @ Nullable final String str ;
612
568
613
- boolean cached = buf .get () != 0 ;
614
-
615
- if (cached ) {
616
- int hash = buf .getInt ();
617
-
618
- return knownStrs .get (hash );
619
- }
620
- else {
621
- int textLen = buf .getInt ();
622
-
623
- if (buf .remaining () < textLen )
624
- return null ;
569
+ /** */
570
+ final int hash ;
625
571
626
- return readString (buf , textLen );
572
+ /** */
573
+ ForwardableString (@ Nullable String str , int hash ) {
574
+ this .str = str ;
575
+ this .hash = hash ;
627
576
}
628
577
}
629
578
0 commit comments