Monday, April 29, 2013

Using RabbitMQ from COBOL

Introduction

Measured in terms of lines of code (an arguably dubious measure), COBOL currently makes up around 65% of all the software used in the world today. It has been estimated that there are some 310 billion lines of COBOL code currently in use, and approximately 5 billion new lines of COBOL are written each year (and hopefully some old lines of code are deleted too). Love it or loath it, COBOL is not going away any time soon; however many organisations are faced with the dilemma of what to do with these legacy applications. On one hand these applications generally fulfil important business-critical functions; but on the other hand they are often also viewed as liabilities that are an impediment to progress and business growth. Accordingly, many organisations spend disturbingly large amounts of money trying to redevelop legacy COBOL applications or to convert them into a more modern programming language in the belief that such approaches are necessary in order for them to preserve their investment in the software and allow it to take advantage of new technologies that will facilitate the realisation of significant business benefits. However such drastic and costly action might not always be required. While languages such as COBOL may be considered old and unfashionable, from a technical perspective there is generally no impediment to applications written in such languages from being able to leverage new technologies, be they Open Source or commercial. This post discusses how COBOL-based applications (or indeed applications written using other 3GL’s) can utilize RabbitMQ to participate in a modern standards-based Open Source message queuing ecosystem. While discussion focuses on interaction of COBOL code with RabbitMQ, the approaches described are directly applicable to many other situations.

Some possible approaches

Client API’s for RabbitMQ are available in many languages. Arguably the most popular API’s are those written in languages such as Python, Ruby, and PHP; however the Java and .NET API’s are also widely used, and adoption of the Erlang client appears to be increasing. For use with C/C++ there is the rabbitmq-c API (see https://github.com/alanxz/rabbitmq-c) and the related C++ implementation (see https://github.com/alanxz/libamqp-cpp); however there are no native language client API’s available for other 3GL-style languages such as COBOL or FORTRAN. This is perhaps something of a gap, given the large body of legacy code written in such languages that could potentially take considerable advantage of the capabilities of RabbitMQ, be it to address new integration requirements or as a cost-effective and fully supported replacement technology for existing proprietary message queuing software. However, 3GL’s such as C, COBOL, and FORTRAN have something in common: they are all compiled into language-independent object code. This means that so long as developers understand the key differences between these various languages in terms of argument passing mechanisms, data type representations, and so on, there is essentially nothing to preclude the creation of mixed-language applications where code written in one language calls functions written in another. Indeed, most COBOL runtime libraries will have been written in C as opposed to COBOL. This perhaps somewhat obvious observation therefore suggests one possible approach to using RabbitMQ from COBOL, namely to implement on top of an existing API such as rabbitmq-c some sort of wrapper layer that is more readily amenable to being called directly from COBOL.

This wrapper layer would be written in C and would handle all aspects of interfacing between C and COBOL. One approach to implementing such a wrapper layer is to simply implement a 1:1 mapping between functions in the rabbitmq-c API and the set of functions that will be called from the COBOL code. This approach will typically yield the most flexible wrapper solution; however it is also likely to be the most problematical wrapper solution to implement and the most complex solution for COBOL developers to incorporate into their code. A better approach is instead to consider the specific requirements of the COBOL application on a case-by-case basis and try to identify ways in which the wrapper API might be implemented to address those requirements through the provision of a clean and simple interface. For example, when establishing a RabbitMQ session, the sequence of events is to create a connection handle, open the network connection, log into RabbitMQ specifying various parameters, and to then open one or more channels on the connection for subsequent use when publishing or consuming messages. At a minimum it is likely to be possible to combine the first three of these operations into a single wrapper API call, and if it is determined that only a single channel is required then all four operations may be combined into a single wrapper call. Similarly, when disconnecting from RabbitMQ, the operations of closing the channel and destroying the connection may also be implemented as a single wrapper API call. To allow for multiple connections, the initial “connect” API call would return a handle pointing to a structure holding connection details and session state. This handle would then be passed to all subsequent API calls, and the handle and any associated dynamically allocated memory would be freed up as part of the wrapper API “disconnect” call. How the handle is managed internally within the wrapper API is a matter of personal choice. For example, it may simply be a pointer to a dynamically allocated structure, or it may be an unsigned integer variable large enough to store the address of any such dynamically allocated structure[1].

Figure 1. Creating a simple wrapper API on top of rabbitmq-c that is amenable to being called from COBOL provides a simple solution to allow COBOL applications to interact with RabbitMQ. The wrapper API need only support the specific functionality that is required by the calling COBOL application, which in many situations may equate to just three or four functions. Handling output parameters and complex data types can present some challenges; however in most cases such matters can be easily handled.

Other decisions that need to be made with regard to this type of approach include (but are not limited to) how to deal with strings and numeric data types. In COBOL, strings are generally fixed length (space-padded to their maximum length if necessary), while C strings are null-terminated. It is possible to place ASCII 0 (C NULL) into a COBOL string before passing the string into a C function; however this is a potentially risky approach, as it relies on developers remembering what needs to be done. Failure to include the NULL will likely cause the program to crash, and it will not always be readily obvious what the problem is. Such an approach is also somewhat messy, and very much detracts from the notion that the wrapper layer is intended to provide a clean and simple interface between the two language environments. A better approach is for the wrapper API to take two arguments for each string (one being a pointer to the string, and the second being its length) and for the wrapper code to implement two private functions to convert between null-terminated strings and fixed-length space-padded strings.

COBOL supports a range of numeric data types, many of which do not readily map to native C data types; however COBOL also includes a rich set of operations to convert numeric values from one data type to another. To avoid having to deal with this myriad of weird and wonderful numeric data types, the simplest approach is to restrict the wrapper API to accepting only those types that map exactly to atomic C data types. Note however that such mappings can vary from one COBOL implementation to another. For example, when using HP COBOL on OpenVMS, the COBOL type PIC 9(9) COMP is equivalent to a long in C; however with OpenCOBOL (see http://www.opencobol.org/) the COBOL type binary-long must be used in this context. 

Once connected to the RabbitMQ broker, the fundamental operations performed by client applications are to publish and consume messages. Additional operations such as the dynamic creation and deletion of exchanges, queues, and bindings might also be performed by clients, depending on how applications are designed. Ignoring these other operations for the moment and focussing on publishing and consuming messages, from a programmatic perspective using the rabbitmq-c API, publishing messages is considerably more straightforward than consuming, generally involving little more than a call to amqp_basic_publish(), and with the exception of the “properties” parameter the arguments to this function are readily mapped between C and COBOL. The string parameters (the exchange name and the routing key) can be handled as described above by passing the addresses of the strings and their lengths, and the message body can be handled in an analogous fashion (since the message is just an array of bytes, just like a fixed-length COBOL string). The channel number and other numeric arguments can be passed by value directly (so long as appropriate COBOL numeric types have been used), and the connection handle may be dealt with as described above. The “properties” parameter of the amqp_basic_publish() call is somewhat more problematical to deal with from COBOL; however this parameter will often not need to be used (default properties will be sufficient), or if it is used then it will only be to specify a small subset of properties, such as the delivery mode, message content type, or possibly to include custom headers. Accordingly, it is usually possible to accommodate the “properties” parameter without too much difficulty, and various approaches to dealing with this and other structures are discussed elsewhere in this post.
 

Publishing messages

Taking the preceding discussion into consideration, it is possible to create on top of rabbitmq-c a greatly simplified API that can be readily called from COBOL to publish messages to the RabbitMQ broker. The following COBOL code (implemented using OpenCOBOL) illustrates the use of such an API developed for the purposes of this post. There is clearly room for improvement to this code (such as the removal of hard-coded string lengths); however it has been deliberately structured for clarity and simplicity, as opposed to conformance to any sort of best programming practice:

identification division.
program-id.    demo01.
data division.
working-storage section.

01 rv                   binary-long.

01 url                  pic x(50) value "amqp://16.156.32.108".
01 exchange             pic x(50) value "amq.direct".
01 routing-key          pic x(50) value "test-key".
01 msg                  pic x(50) value "A test message".

01 error-text           pic x(100).

01 handle               binary-double unsigned.


procedure division.
00.
        call "RMQ_CONNECT" using
                        by reference handle
                        by reference url
                        by value 16
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_PUBLISH" using
                        by value handle
                        by reference exchange
                        by value 10
                        by reference routing-key
                        by value 8
                        by value 0
                        by value 0
                        by reference msg
                        by value 14
                        by value 0
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value handle
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.


        call "RMQ_DISCONNECT" using by value handle.
        stop run.

end program demo01.

The function RMQ_CONNECT calls the standard sequence of rabbitmq-c functions to connect and login to the target RabbitMQ broker and to open a channel on the connection[2]. Assuming that these operations complete successfully, handle will hold the address of a pointer to a simple structure containing the connection and channel details, plus details of the last error that occurred (if any). In accordance with the preceding discussion, the URI connect-string (amqp://16.156.32.108) is passed to RMQ_CONNECT specifying both its address and length. If the function encounters an error then a return value of 0 will be given, whereupon the function RMQ_STRERROR may be used to display any error text that might be available. Note that if RMQ_CONNECT returns an error status, it cannot be assumed that handle is valid, and a null handle is therefore supplied to RMQ_STRERROR in this instance.

Assuming that RMQ_CONNECT completes successfully, the code then calls RMQ_PUBLISH to publish a message to the broker. This function is essentially just a simple wrapper on top of the rabbitmq-c API function amqp_basic_publish(). As per RMQ_CONNECT, both the address and length of string arguments are specified, and the function returns an integer status to indicate success or failure. The connection handle is passed by value (this value being the address of the structure holding connection and channel details). The last argument to RMQ_PUBLISH is the address to a “properties” structure, which for this example is null (“by value 0”), meaning that default properties will be used for the publish operation. As commented previously, methods for dealing with this and other structures are discussed elsewhere in this post. As per the call to RMQ_CONNECT, the return status of RMQ_PUBLISH is checked, and any error text is displayed. Lastly, the example code calls RMQ_DISCONNECT to disconnect from the broker and to free up any resources associated with the supplied connection handle. The function closes the channel, destroys the connection to the broker, and frees up memory associated with the connection handle structure.

The effort required to implement a high-level wrapper API on top of rabbitmq-c that can be called from COBOL to publish messages to the RabbitMQ broker is not significant (possibly a day of effort), and as can be seen from the example above, the resulting COBOL code is very straightforward. Some complexity may arise if the "properties" argument needs to be used; however even in such circumstances the additional code will be minimal (see elsewhere in this document). In addition, the resultant wrapper API is sufficiently generic that it can be used with multiple variants of COBOL and with other languages. For example, the following code illustrates the same example implemented in HP FORTRAN on HP OpenVMS. 

        program demo01
        implicit none

        external                RMQ_CONNECT
        integer*4               RMQ_CONNECT
        external                RMQ_PUBLISH
        integer*4               RMQ_PUBLISH

        character*100           error_text
        integer*4               rv
        integer*8               handle


        rv = RMQ_CONNECT(handle, %ref('amqp://16.156.32.108'), %val(20))

        if (rv .eq. 0) then
           call RMQ_STRERROR(%val(0), %ref(error_text), %val(50))
           print *, error_text
        end if

        rv = RMQ_PUBLISH(%val(handle),
        1                %ref('amq.direct'),
        2                %val(10),
        3                %ref('test-key'),
        4                %val(8),
        5                %val(0),
        6                %val(0),
        7                %ref('A test message'),
        8                %val(14),
        9                %val(0))

        if (rv .eq. 0) then
           call RMQ_STRERROR(%val(handle), %ref(error_text), %val(50))
           print *, error_text
        end if

        call RMQ_DISCONNECT(%val(handle))

        end

Note that HP FORTRAN on OpenVMS by default passes string variables by descriptor (a built-in mechanism for passing the address of the string and its length as a single argument), and it is therefore necessary to use the %ref modifier to override this default behaviour and explicitly pass string arguments by reference. An alternative approach would have been to implement a wrapper API that accepts descriptors; however this would have been less generic and somewhat platform-specific.

Consuming messages

In contrast to publishing messages, consuming messages via AMQP using the rabbitmq-c API involves a call to amqp_basic_consume() for each queue of interest followed by a loop that implements a reasonably complex sequence of function calls to process any received frames in order to extract message bodies and any other information that may be of interest. Depending on program design, the consumer may also be required to perform calls to acknowledge to the RabbitMQ broker any messages that have been received and successfully processed. All of this processing can be wrapped into a simplified set of high-level functions in a similar manner to that described above for publishing messages; however to illustrate that other approaches are possible, an alternative method will be used in this section.

This alternative approach is in some ways an extension of the wrapper-based approach described in the previous section, but it extends the concept to the creation of a generic consumer program (named amqp-server) that negates the need for developers to include any AMQP-related calls in their consumer code. The following output lists the various command line options supported by the amqp-server program implemented for the purposes of this discussion:

Usage: ./amqp-server [options] -s key[:function] -l image -q queue

Options:
    -s key[:function]     One or more binding keys (function names optional)
    -U username           Username (default "guest")
    -P password           Password (default "guest")
    -h hostname           Broker host (defaults to current host)
    -o filename           Write all output to the specified log file
    -p port               Broker port (default 5672)
    -v vhost              Virtual host (default "/")
    -e exchange           Exchange name (default "amq.direct")
    -l filename           Shared library
    -q queue              Queue name
    -d                    Enable debug-level logging
    -t                    Enable trace-level logging

    Use "-s @filename" to load service details from the specified file

The basic idea with amqp-server is to map binding keys to functions contained in a shared library. On start-up, amqp-server loads the specified shared library, and via one or more -s options (or via a file if there are a large number of mappings) it maps binding keys to functions in the shared library according to the syntax binding-key:function-name. The function name is optional, and if it is not specified then the binding key value will be used as the function name (which may not be valid in many instances, particularly for keys associated with topic exchanges). The binding keys are associated with the queue specified via the -q option and the exchange specified via the -e option (or the amq.direct exchange is used by default). The queue will be created if it does not already exist. After processing the various command line options, amqp-server listens for messages to process, and upon receipt of a message, if a valid mapping exists then the associated shared library function will be called. 

Figure 2. By implementing a generic consumer such as amqp-server, it is possible for developers to write application code (in COBOL or otherwise) without any particular knowledge of AMQP or RabbitMQ. Application code is implemented as a shared library that is dynamically loaded by amqp-server. Functions within the shared library are associated with binding keys, and upon receipt of a message, amqp-server examines the key associated with the message and invokes the associated shared library function, passing it the message data and various metadata.  

Clearly there are many additional features that could be added to amqp-server to make it more flexible or to address specific requirements; however the simple implementation described here serves sufficiently well to illustrate the basic concept. From a development perspective all that is required to implement a consumer is to write code that conforms to the interface illustrated by the following piece of COBOL code and to build the code into a shareable image.

identification division.
program-id. func1.
data division.
working-storage section.

linkage section.
*
01 ctxt                 usage pointer.
01 idata                usage pointer.
01 ilen                 usage binary-long.
01 odata                usage pointer.
01 olen                 usage binary-long.


procedure division using ctxt, idata, ilen, odata, olen.
00.
        display idata(1:ilen).

end program func1.

The above COBOL code represents a single function named func1 that takes five arguments and has no return value. The first argument ctxt is the address of a structure populated by amqp-server with various data, such as the routing key, the correlation ID, and the name of the reply queue (if applicable). There are presently no functions provided to extract any of these details from the structure; however future enhancements might look to provide this functionality. The second and third arguments are the address of the consumed message and its length, and the final two arguments can be used to store the address of a return message and its length in order to cater for RPC-style use-cases. It should be noted that all function arguments are passed by reference, as this is a requirement for many COBOL implementations. Looking at it from a C language perspective, the prototype for shared library functions called by amqp-server is therefore as follows (and indeed amqp-server may be used with shared libraries written in C code):

extern void func1(void *ctxt, void *idata, int *ilen, void **odata, void *olen);

The above piece of COBOL code for func1 was written for use with OpenCOBOL, and can be compiled and linked into a UNIX shared library according to the following command, assuming that the code resides in the file funcs.cob.

$ cobc -free -fimplicit-init -fstatic-call -m funcs.cob

The above command will compile and link funcs.cob into a shared library named funcs.so, which can then be used by amqp-server as follows:

$ ./amqp-server -h az2-2xl-1 -l ./funcs.so -q boris -e amq.direct -s test-key:func1

This command instructs amqp-server to load the shared library funcs.so and to connect to the RabbitMQ broker running on the specified server. Default values for the AMQP port number, username, and password will be used; the queue "boris" will be created if necessary, and the queue will be bound to the exchange amq.direct with binding key "test-key". Any messages published to the amq.direct exchange with routing key test-key will be consumed by amqp-server, and func1 will be called to process each such message. 

It should be noted that funcs.cob may contain multiple functions conforming to the format described above, or indeed the shared library to be loaded by amqp-server may be built from multiple source files. The inclusion of multiple functions is illustrated in the next section.

The advantage of this approach to implementing consumers is that application developers require minimal knowledge of RabbitMQ and AMQP in order to be productive, and they typically do not need to incorporate any RabbitMQ or AMQP-specific API calls into their code. This approach also has the advantage of being potentially more amenable to use by existing legacy application code, as essentially all that is required to incorporate such code into the AMQP 0.9.1 environment is to expose the necessary functionality as a set of API functions that conform to the function prototype required by amqp-server and to be able to build the code into a shared library[3]. A disadvantage of the generic consumer approach is that the generic consumer supports only a specific subset of AMQP features and thereby constrains client consumer applications in terms of what they can and cannot do. However, the effort required to implement this type of generic consumer model is not great (perhaps two days of work are required to implement something like amqp-server), and the basic implementation described here may be easily extended to include additional options and functionality[4]. Aside from support for various AMQP features, non-functional requirements such as scalability and availability may also need to be taken into consideration when designing and building a generic consumer.

RPC server use-case

As commented above, if a message consumed by amqp-server specifies a reply queue then amqp-server operates in accordance with the RPC messaging pattern[5] and assumes that a response message must be sent back to the associated client process. All of the complexity associated with RPC processing such as determining the name of the reply queue and correlation ID is handled by amqp-server, and all that the user-written code needs to do is to ensure that valid values for the response message and its length are returned in the output variables odata and olen described above, as illustrated by the following OpenCOBOL example.

identification division.
program-id. func2.
data division.
working-storage section.

linkage section.
*
01 ctxt                 usage pointer.
01 idata                usage pointer.
01 ilen                 usage binary-long.
01 odata                usage pointer.
01 olen                 usage binary-long.

01 txt                  pic x(60) based.

procedure division using ctxt, idata, ilen, odata, olen.
00.
        display idata(1:ilen).

        allocate (60) characters initialized returning odata.
        set address of txt to odata.
        move "This is the reply" to txt.
        move 17 to olen.

end program func2.

The example code displays the received message, allocates memory for the RPC response message and assigns the starting address of the allocated memory to odata, populates the response buffer with the desired message, and populates olen with the length of the response message.

It was commented previously that differences between COBOL implementations need to be taken into careful consideration when mapping numeric data types between C and COBOL; however differences between implementations may also have ramifications in terms of how code is written, particularly with regard to the handling of pointers. OpenCOBOL (as used for most of the examples in this post) operates by translating COBOL code into C code, and the resultant C code is then compiled into object code using the chosen C compiler. The fact that OpenCOBOL operates in this manner makes possible the use of some constructs that might not be possible when using other COBOL compilers that compile directly to object code. For example, in the above piece of COBOL code OpenCOBOL allows the pointer idata (the received message) to be treated as a string such that its contents we can be displayed using a statement of the form “display idata(1:ilen)”. This approach will not work with HP COBOL for OpenVMS (or indeed many other COBOL compilers), and it is instead necessary to resort to devious tactics to copy the message into an appropriately sized string variable defined in working storage, as shown below. 

identification division.
program-id. func2.
data division.
working-storage section.

01 msg                  pic x(100).

linkage section.
*
01 ctxt                 usage pointer.
01 idata                usage pointer.
01 ilen                 pic 9(9) comp.
01 odata                usage pointer.
01 olen                 pic 9(9) comp.

procedure division using ctxt, idata, ilen, odata, olen.
00.

        call "LIB$MOVC3" using
                        by reference ilen
                        by reference idata
                        by reference msg.

        display msg(1:ilen).

        move 'This is the reply' to msg.
        move 17 to olen.

        call "DECC$MALLOC" using
                        by value olen
                        giving odata.

        call "LIB$MOVC3" using
                        by reference olen
                        by reference msg
                        by value odata.

end program func2.

The above example also illustrates differences in numeric data type usage between OpenCOBOL and HP COBOL for OpenVMS code, and shows how OpenVMS C runtime library calls can be used from COBOL to dynamically allocate memory for the response message[6].

Generally speaking OpenCOBOL provides quite powerful facilities for dynamic memory management; however this may not be true of many other COBOL implementations, and alternative approaches may be required. In the preceding example using HP COBOL for OpenVMS, the C runtime function DECC$MALLOC (which equates to the standard malloc() function) was called directly from COBOL to allocate memory for the response message. Interacting directly with the C runtime library in this way might not be possible with other COBOL implementations, and in such situations a reasonable approach would be to write simple wrapper functions around functions such as malloc() and free() that can be more readily called from COBOL.

RPC client

The preceding section described how the generic consumer amqp-server could be used to address the RPC server use-case using COBOL, but what if we instead wished to implement an RPC client in COBOL? This scenario can be implemented using the wrapper technique, by creating a wrapper function on top of rabbitmq-c that handles the complexity associated with the AMQP-based RPC request and response processing, and provides a simple interface to the COBOL code. For example, consider the function RMQ_RPC_CALL in the following sample COBOL program.

identification division.
program-id.    demo02.
data division.
working-storage section.

01 rv                   binary-long.
01 len                  binary-long.

01 url                  pic x(50) value "amqp://16.156.32.108".
01 exchange             pic x(50) value "amq.direct".
01 routing-key          pic x(50) value "rpc-key".
01 rqst                 pic x(50) value "RPC test message".

01 repl                 pic x(100).
01 error-text           pic x(100).

01 handle               binary-double unsigned.


procedure division.
00.
        call "RMQ_CONNECT" using
                        by reference handle
                        by reference url
                        by value 16
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        move 100 to len.

        call "RMQ_RPC_CALL" using
                        by value handle
                        by reference exchange
                        by value 10
                        by reference routing-key
                        by value 7
                        by reference rqst
                        by value 16
                        by reference repl
                        by reference len
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.


        display repl(1:len).

        call "RMQ_DISCONNECT" using by value handle.
        stop run.

end program demo02.

The RMQ_RPC_CALL function handles all aspects of the RPC request and response processing. At a high level, this processing includes setting up the reply queue[7] (if necessary) and ensuring that the message properties for the published message specify the name of the reply queue (via the reply_to property), publishing the request buffer, and waiting for and consuming the RPC reply message that is returned to the caller. Note that in this particular case, a fixed-size buffer is used to store the returned reply, and an error status will be reported by RMQ_RPC_CALL if the size of the reply exceeds the fixed-size limit. An alternative approach would be to dynamically allocate memory for the reply and for the client to free this memory when it is no longer required; however it is arguably more common for COBOL to work with fixed-size records, and dealing with dynamic memory allocation is not something that many COBOL programmers are necessarily familiar with or used to doing. Approaches to dynamic memory management with COBOL were briefly considered in the previous section.

It should be noted that synchronous RPC processing is not a particularly efficient messaging use-case, and indeed it somewhat negates some of the fundamental aims of message queuing; however it remains a frequently used processing model, and it is a use-case that RabbitMQ readily supports.

Dealing with complex structures

AMQP operations such as publishing messages and declaring queues and exchanges permit the specification of various optional (and sometimes implementation-specific) attributes or properties. For example, when publishing messages it is possible to optionally specify properties such as the delivery mode, message content type, message TTL, correlation ID, and so on. The rabbitmq-c API provides various structures and constant definitions to support the specification of these attributes; however working with these C structures from COBOL can sometimes be challenging.

It will sometimes be readily possible to replicate C structures directly as COBOL records; however for complex structures such as nested structures and structures containing entities such as pointers, unions, and enumerations, this direct approach might be problematical. There may also be potentially non-obvious issues to consider such as differences in terms of how C and COBOL compilers align individual structure fields. Consequently alternative approaches to working with structures in a mixed C/COBOL language environment will often need to be considered.

One such alternative approach is to identify common scenarios (such as specifying a delivery mode when publishing messages) and to then predefine constant structures for those scenarios in the C wrapper API layer, and provide a means for COBOL code to access and use those structures. For example, the following C function defines a properties structure that can be used by RMQ_PUBLISH to publish messages to RabbitMQ with delivery mode 2 (messages published with delivery mode 2 that are delivered to durable queues will be persisted to disk if they are not consumed immediately):

void *RMQ_MSG_PROPS_PERSISTENT()
{
        static const amqp_basic_properties_t MESSAGE_PROPERTIES_PERSISTENT = {
                AMQP_BASIC_DELIVERY_MODE_FLAG,
                { 0, NULL },
                { 0, NULL },
                { 0, NULL },
                2,                      /* Persistent */
                0,
                { 0, NULL },
                { 0, NULL },
                { 0, NULL },
                { 0, NULL },
                0,
                { 0, NULL },
                { 0, NULL },
                { 0, NULL },
                { 0, NULL }
        };

        return ((void *) &MESSAGE_PROPERTIES_PERSISTENT);
}

The function RMQ_MSG_PROPS_PERSISTENT() can then be used from COBOL as illustrated below. Note that for this particular example, the COBOL code stores the address of the predefined properties structure in an unsigned 64-bit integer variable. A pointer would arguably be more consistent; however not all COBOL implementations will necessarily support this. Differences between this example and the demo01 example given in the “Publishing messages section are highlighted using bold font.

identification division.
program-id.    demo04.
data division.
working-storage section.

01 rv                   binary-long.

01 url                  pic x(50) value "amqp://127.0.0.1".
01 exchange             pic x(50) value "amq.direct".
01 routing-key          pic x(50) value "test-key".
01 msg                  pic x(50) value "A test message".

01 error-text           pic x(100).

01 handle               binary-double unsigned.
01 props                binary-double unsigned.


procedure division.
00.
        call "RMQ_CONNECT" using
                        by reference handle
                        by reference url
                        by value 16
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_MSG_PROPS_PERSISTENT" giving props.

        call "RMQ_PUBLISH" using
                        by value handle
                        by reference exchange
                        by value 10
                        by reference routing-key
                        by value 8
                        by value 0
                        by value 0
                        by reference msg
                        by value 14
                        by value props
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value handle
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.


        call "RMQ_DISCONNECT" using by value handle.
        stop run.

end program demo04.

While straightforward to implement, the use of predefined structures in this manner is of course useless for situations that require the specification of variable attributes, such as a message TTL or correlation ID. In such situations, an often viable approach is to implement functions in the C wrapper API that can be called from COBOL to create and destroy structure instances and to get and set values for specific structure fields, as illustrated by the following example:

identification division.
program-id.    demo05.
data division.
working-storage section.

77 AMQP_BASIC_DELIVERY_MODE_FLAG
                        binary-long value 4096.
77 AMQP_BASIC_CONTENT_TYPE_FLAG
                        binary-long value 32768.

01 rv                   binary-long.

01 url                  pic x(50) value "amqp://127.0.0.1".
01 exchange             pic x(50) value "amq.direct".
01 routing-key          pic x(50) value "test-key".
01 msg                  pic x(50) value "A test message".
01 content-type         pic x(50) value "text/plain".

01 delivery-mode        binary-char value 2.
01 error-text           pic x(100).

01 handle               binary-double unsigned.
01 props                binary-double unsigned.


procedure division.
00.
        call "RMQ_CONNECT" using
                        by reference handle
                        by reference url
                        by value 16
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_PROPS_NEW" giving props.

        call "RMQ_PROPS_SET" using
                        by value props
                        by value AMQP_BASIC_DELIVERY_MODE_FLAG
                        by reference delivery-mode
                        by value 0.

        call "RMQ_PROPS_SET" using
                        by value props
                        by value AMQP_BASIC_CONTENT_TYPE_FLAG
                        by reference content-type
                        by value 10.

        call "RMQ_PUBLISH" using
                        by value handle
                        by reference exchange
                        by value 10
                        by reference routing-key
                        by value 8
                        by value 0
                        by value 0
                        by reference msg
                        by value 14
                        by value props
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value handle
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_PROPS_FREE" using by value props.
        call "RMQ_DISCONNECT" using by value handle.
        stop run.

end program demo05.

The function RMQ_PROPS_NEW allocates memory for a new instance of the properties structure (and returns the address of the newly allocated instance), and the function RMQ_PROPS_SET is called to set the delivery mode and content type attributes of the structure. The third parameter to RMQ_PROPS_SET is used to specify the size of the value being set (the second parameter). For the specification of the delivery mode, the size is implicit and the size parameter is therefore specified as 0; however when specifying the content type, it is necessary to specify the length of the supplied COBOL string. After publishing the message, the function RMQ_PROPS_FREE can be used to free memory previously allocated by the call to RMQ_PROPS_NEW. Values for the constants AMQP_BASIC_DELIVERY_MODE_FLAG and AMQP_BASIC_CONTENT_TYPE_FLAG correspond to values defined in rabbitmq-c (and are as per the AMQP 0.9.1 specification). These and other constants associated with the specification of various properties and attributes would typically be defined in a COBOL copybook (the COBOL equivalent of a C header file) that would be included in the COBOL code.

Creating and destroying resources

As discussed previously, in addition to publishing and consuming messages, client applications might also be required to dynamically create and delete entities such as queues, exchanges, and bindings. One of the advantages of the AMQP model over more traditional message queuing technologies is that client programs can dynamically configure these entities at runtime, as opposed to requiring the configuration to be fully defined (using special configuration tools) before any client programs can be started.

The wrapper technique described above can be used to create on top of rabbitmq-c a simple set of functions that may be called from COBOL to perform these resource management functions. For example, the following simple COBOL program calls the functions RMQ_DECLARE_EXCHANGE, RMQ_DECLARE_QUEUE, and RMQ_BIND_QUEUE to create to create the exchange “cobol-exchange” and queue “cobol-queue”, and to bind the queue to cobol-exchange with binding key “cobol-key”. Note that this example was written for OpenCOBOL, and as discussed previously, minor changes to some of the variable declarations will likely be required for other COBOL variants.

identification division.
program-id.    demo03.
data division.
working-storage section.

01 rv                   binary-long.

01 url                  pic x(50) value "amqp://127.0.0.1".
01 exchange             pic x(50) value "cobol-exchange".
01 exchange-type        pic x(50) value "direct".
01 binding-key          pic x(50) value "cobol-key".
01 queue-name           pic x(50) value "cobol-queue".

01 passive              binary-long value 0.
01 durable              binary-long value 1.
01 exclusive-flag       binary-long value 0.
01 auto-delete          binary-long value 0.

01 error-text           pic x(100).

01 handle               binary-double unsigned.


procedure division.
00.
        call "RMQ_CONNECT" using
                        by reference handle
                        by reference url
                        by value 16
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_DECLARE_EXCHANGE" using
                        by value handle
                        by reference exchange
                        by value 14
                        by reference exchange-type
                        by value 6
                        by value passive
                        by value durable
                        by value 0
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_DECLARE_QUEUE" using
                        by value handle
                        by reference queue-name
                        by value 11
                        by value 0
                        by value 0
                        by value passive
                        by value durable
                        by value exclusive-flag
                        by value auto-delete
                        by value 0
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value 0
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.


        call "RMQ_BIND_QUEUE" using
                        by value handle
                        by reference queue-name
                        by value 11
                        by reference exchange
                        by value 14
                        by reference binding-key
                        by value 9
                        by value 0
                        giving rv.

        if rv = 0
           call "RMQ_STRERROR" using
                        by value handle
                        by reference error-text
                        by value 50
           end-call

           display error-text
           stop run
        end-if.

        call "RMQ_DISCONNECT" using by value handle.
        stop run.

end program demo03.

The functions RMQ_DECLARE_EXCHANGE, RMQ_DECLARE_QUEUE, and RMQ_BIND_QUEUE are simple wrappers around the corresponding rabbitmq-c functions amqp_exchange_declare(), amqp_queue_declare(), and amqp_queue_bind() respectively.  

It should be noted that if a null or zero-length queue name is supplied to RMQ_DECLARE_QUEUE, then a unique RabbitMQ-generated queue name and its length will be returned in the fourth and fifth arguments of the function call (assuming valid non-null arguments are supplied). In the above example, these arguments have been specified as 0 (null pointers), since a queue name (“cobol-queue”) has been supplied. The final parameter to each of the three wrapper calls has also been specified as null. This parameter may be used to specify a table of optional attributes pertaining to the operation in question (such as specifying a per-queue message TTL or queue expiry when declaring a queue). If this parameter must be used, then the techniques described in the previous section may be utilised to populate and manage the associated data structure.

Conclusion

This post describes several approaches that can be employed to use AMQP and RabbitMQ from COBOL and other “legacy” languages. Implementing a set of wrapper functions on top of the rabbitmq-c API that can be more readily called from COBOL is a simple and straightforward technique well-suited to most AMQP operations, and the “generic consumer” approach, which provides facilities to expose the functionality that you wish to access via RabbitMQ as a set of functions in a shared library, can greatly simplify development and testing of consumer processes. Handling pointers, dynamic memory, and complex structures can present some challenges, and techniques for dealing with these matters have been described. However, it was also noted that considerable simplification can generally be achieved by implementing a solution specific to the use-case(s) in question, as opposed to trying to implement a complete and fully generic solution. Other approaches to interacting with RabbitMQ from COBOL code to those described here are also possible and the most appropriate approach should be assessed on a case-by-case basis; however the key point is that there is no impediment to legacy applications participating in a RabbitMQ-based message queuing environment, and indeed the approaches described here are sufficiently generic that they may be readily used and extended to facilitate integration between COBOL and other modern Open Source software technologies. COBOL is not going away any time soon, and as long as organisations with considerable investments in COBOL are using supported platforms then in many cases they probably do not need to be quite as concerned as some IT vendors might like them to be about the ongoing viability of their software environments. Arguably the biggest issue facing such organisations is availability of skilled COBOL developers; however this problem is soluble. The article http://www.theregister.co.uk/2013/03/11/cobol_paradox/ makes for interesting reading in this regard.

It should be noted that much of the material described in this post relates to versions 0.8 and 0.9.1 of the AMQP protocol, as opposed to 1.0. AMQP 1.0 is a considerable paradigm shift from earlier versions, and the merits of this shift are debatable. RabbitMQ supports versions 0.8.0 and 0.9.1 of AMQP, and as of version 3.1.0 RabbitMQ will also provide partial support for the AMQP 1.0 protocol (see https://github.com/rabbitmq/rabbitmq-amqp1.0)[8]. It will be interesting to monitor the adoption rate of AMQP 1.0, or whether developers will prefer to continue using the popular and highly successful 0.9.1 model. From a legacy integration perspective, there is generally a somewhat better mapping between traditional proprietary message queuing technologies and the AMQP 0.9.1 model than with AMQP 1.0, and organisations considering moving away from such proprietary technologies to an AMQP-based solution should take such matters into careful consideration. A subsequent post will discuss how RabbitMQ might be used to replace some of these traditional message queuing technologies.

Example code

Code for all examples discussed in this post can be found at https://github.com/brc859844/rabbitmq-cobol



[1] Most modern COBOL implementations provide a generic pointer data type that can be used for this purpose. If pointers are not supported then the equivalent can be achieved using an unsigned integer data type of a suitable size (32-bit or 64-bit, depending on the word size of the platform in question).

[2] The typical sequence of calls performed to establish a connection to the broker, login, and open a channel is amqp_new_connection(), amqp_open_socket(), amqp_set_sockfd(), amqp_login(), and amqp_channel_open(), as per the examples provided with the rabbitmq-c API. Additionally, the functions amqp_default_connection_info() and amqp_parse_url() are used to process the supplied connection URI.

[3] The model used by amqp-server of mapping binding keys to functions is also somewhat similar to the model used by Oracle TUXEDO, where TUXEDO service names are mapped to functions in application server processes. Accordingly, the amqp-server approach (and the generic consumer approach in general) presents a potentially viable method of replacing TUXEDO applications with RabbitMQ, particularly if the TUXEDO application uses the STRING, CARRAY, or XML buffer types (FML and VIEW buffers are potentially more problematical to handle, although conversion of these types to other schemes is readily possible). It should also be noted that the RMQ_RPC_CALL function illustrated in the “RPC client” section of this post is not dissimilar in operation to the TUXEDO TPCALL function.

[4] Some obvious enhancements would include specification of pre-fetch counts, optional auto-acknowledgement of consumed messages, specification of various queue characteristics, and support for multiple queues and/or exchanges.
 
[5] See http://www.rabbitmq.com/tutorials/tutorial-six-python.html for a more detailed discussion of the RPC pattern and recommendations for its use (or otherwise).

[6] It should be noted that amqp-server expects the RPC response message to have been dynamically allocated via a call to malloc() or calloc(), and it internally takes care of freeing this memory via a call to free() once the response message has been published to RabbitMQ. Care must therefore be taken to ensure that memory has been correctly allocated using malloc() or calloc() as opposed to using other system services that might be provided by the operating system in question. For example, it would be incorrect on HP OpenVMS to allocate memory for the response buffer using lib$get_vm(). The OpenCOBOL dynamic memory allocation system uses malloc() and calloc(), and for COBOL implementations where such facilities are not directly available, it is straightforward to write simple wrapper functions for these functions so that they can be more readily used from COBOL.

[7] The RPC reply queue is created to be exclusive and auto-delete.

[8] AMQP 1.0 should not be viewed as the natural successor to AMQP 0.9.1. The two protocols are radically different in scope, and support of AMQP 1.0 is from the RabbitMQ perspective little different to supporting other protocols such as MQTT and STOMP. 

1 comment:

  1. Brett,
    We are in a similar situation with RabbitMQ, but instead of COBOL, we use RPGLE on an iseries (AS400). As far as you know, would the same rules apply? If so, do you have RPG examples? Thanks. t.davis@toprx.com

    ReplyDelete