Announcement

Collapse
No announcement yet.

Get a response from several destinations simultaneously

Collapse
X
 
  • Filter
  • Time
  • Show
Clear All
new posts

  • Get a response from several destinations simultaneously

    Please help.

    My channel sends a request to a number of other systems, must wait for the response of each system, and combine all the responses into one.
    Exact number of systems is not known.
    I go through all system as:

    for (address in addresses) {
    reply = router.routeMessage('ContactSystemChannel', address);
    }

    It works but sequentially.
    How can I execute requests to several systems simultaneously?

    I set Max processing threads to > 1 in ContactSystemChannel, it does not help. I can not ON the Source queue because I need the result of channel execution in the response.

    What I can do is to set several destinations in the channel.
    Destination 1 calls:
    reply = router.routeMessage('ContactSystemChannel', address1);

    Destination 2 calls:
    reply = router.routeMessage('ContactSystemChannel', address2);

    etc.

    and remove Wait for previous destination checkmark for all destinations. Destinations are executed simultaneously and I have what I want.
    But I don't know the exact number of systems, therefore I must set more destinations than systems and remove extra destinations from the DestinationSet in the source transformer.
    It looks ugly. And I'm not sure that my channel has enough of destinations and there will not be more systems than destinations someday.

    Is there a better way to get the response from several destinations simultaneously?

    Thanks

  • #2
    To clarify...

    You take a source message from the Sending System and need to route it to multiple systems and that number changes over time?
    You must then take the response of SENT/FILTERED/ERROR from each of the destination systems combine that into 1 response and send it back to the Sending System?

    Comment


    • #3
      Yes, that's correct. Actually, I must collect not response status SENT/FILTERED/ERROR, but response message from each of the destination systems. That's not a big difference.
      Number of systems is not changed often but may be changed after channel is installed. I want to make channel generic to exclude need of channel configuration every time when number of systems is changed.

      Comment


      • #4
        Sounds like the answer is Response: Postprocessor.
        Then you write a postprocessor script to the channel that loops through the destination list gathering the messages and returning the value desire. Postprocessor runs after all destinations have responded and doesn't track a destination subset. By using a list of destinations the programmer will not care if the value is X or X+1.
        This is tricky because most of the variables and maps used in the transformer steps are out of scope and not accessible.


        Please post a mockup of a few short destination responses and your desired result. 3 destinations should do the trick for this test as we could see what happens when each destination responds with a different value.

        Comment


        • #5
          I know about Postprocessor and that was my plan - write a script in postprocessor to combine all responses into one. In response I get a small JSON string. I planned to use global channel map to pass responses to Postprocessor, and now I see that this may be the next problem.

          My question was: Now I have 2 systems I must send a request and receive a response from, I configure 2 Destinations in the channel, uncheck "Wait for previous destination" checkmark and two destinations are executed concurrently. If system 3 is added I must add Destination 3 for it. How can I do it without cloning one of the Destinations manually? It is not possible to add Destinations at runtime, as far as I know, it is only possible to delete Destinations. So I can foresee that system 3 may appear and add Destination 3 to the channel initially. While system 3 is not there I delete Destination 3 from DestinationSet for each message and after system 3 is added I use Destination 3 to send a request and receive a response from it.
          And what if one day system 4 is added? It will require Destination 4. It seems that initially, channel must have a number of Destinations not less than the maximal number of systems expected.
          I just wanted to ensure that this is the only way - make as many Destinations as a maximal number of systems expected. Or there is a more elegant solution, like add a new Destination when it is needed? When I use router.routeMessage() I can easily call it as many times as I have systems, but I can not make these calls concurrent. Destinations can be executed concurrently, but can I add a new Destination when I need it?

          Thank you for participating, btw

          Comment


          • #6
            You can do this in a transformer or javascript writer by calling router.routeMessage from different threads. One way to do this is using a java ThreadPoolExecutor.

            Calling Channel
            I set up the ThreadPoolExecutor once in the deploy script of the calling channel and store in the globalChannelMap. There are other ways to do this to control the number and lifetime of threads.
            Code:
            $gc('threadPoolExecutor', java.util.concurrent.Executors.newCachedThreadPool());
            All data types are set to Raw except Destination 1 outbound data type is JSON.

            In the source tab of the channel I set the response to Destination 1.

            Destination 1 is a Javascript Writer with the following single line script:
            Code:
            return connectorMessage.getEncodedData();
            Destination 1 transformer looks like this:
            Code:
            function getAddressProcessor(address) {
                return new java.util.concurrent.Callable({call: function() {
                    var result = {
                        startTime: java.time.LocalDateTime.now(),
                        address: address
                    }
                    // send msg as the message and address as a sourceMap variable
                    result.response = router.routeMessage('ContactSystemChannel', new RawMessage(msg, null, {address:address}));
                    result.runDuration = java.time.Duration.between(result.startTime, java.time.LocalDateTime.now());
                    return result;
                }});
            }
            
            // get from configurationMap or use default value if not set. All configurationMap values are Strings
            var addresses = $cfg('destination addresses') || '["http://address1.com","http://address2.com","http://address3.com","http://address4.com","http://address5.com"]';
            
            addresses = JSON.parse(addresses);
            
            // initialize outbound object
            var response = {
                results: {},
                statuses: {},
                startTime: java.time.LocalDateTime.now()
            };
            
            var futureList = $gc('threadPoolExecutor').invokeAll(addresses.map(getAddressProcessor));
            
            for (var future in Iterator(futureList)) {
                 var result = future.get();
                 var r = {};
                 r.status = result.response.status.toString();
                 r.message = result.response.message;
                 r.duration = result.runDuration.toString();
                 r.startTime = result.startTime.toString();
                 response.results[result.address] = r;
                 if (response.statuses[r.status]) {
                 response.statuses[r.status]++;
                 }
                 else {
                 response.statuses[r.status] = 1;
                 }
            }
            response.runDuration = java.time.Duration.between(response.startTime, java.time.LocalDateTime.now()).toString();
            
            // set response object as the outbound message
            msg = response;
            ContactSystemChannel
            For my test channel I chose Destination 1 as the response. The number of source threads on this channel will determine how many requests can process concurrently. If there are fewer source threads than number of addresses, then some of them will wait until threads become available. I gave mine 5 threads.

            Destination 1 is a javascript writer:
            Code:
            var status = Math.random() > 0.5 ? Status.SENT : Status.ERROR;
            java.lang.Thread.sleep(Math.floor(Math.random() * 10000));
            return new Response(status, $('address') + ' ' + connectorMessage.getEncodedData());
            Test and response
            I sent the calling channel a message of "This is a test message."
            This is the response that was sent out:
            Code:
            {
              "results" : {
                "http://address1.com" : {
                  "status" : "ERROR",
                  "message" : "http://address1.com This is a test message",
                  "duration" : "PT9.455398S",
                  "startTime" : "2020-07-30T07:47:42.730203"
                },
                "http://address2.com" : {
                  "status" : "SENT",
                  "message" : "http://address2.com This is a test message",
                  "duration" : "PT7.734207S",
                  "startTime" : "2020-07-30T07:47:42.732233"
                },
                "http://address3.com" : {
                  "status" : "SENT",
                  "message" : "http://address3.com This is a test message",
                  "duration" : "PT4.856515S",
                  "startTime" : "2020-07-30T07:47:42.735372"
                },
                "http://address4.com" : {
                  "status" : "ERROR",
                  "message" : "http://address4.com This is a test message",
                  "duration" : "PT6.515621S",
                  "startTime" : "2020-07-30T07:47:42.745642"
                },
                "http://address5.com" : {
                  "status" : "SENT",
                  "message" : "http://address5.com This is a test message",
                  "duration" : "PT9.009106S",
                  "startTime" : "2020-07-30T07:47:42.747295"
                }
              },
              "statuses" : {
                "ERROR" : 2,
                "SENT" : 3
              },
              "runDuration" : "PT9.460324S"
            }
            You can see the total run time was only 5ms slower than the slowest address, and some of that was spent creating the response.

            Comment


            • #7
              This is exactly what I need. Thank you very much for your help!

              Comment

              Working...
              X