An example ( Communicating between Object REXX threads

An example ( Communicating between Object REXX threads

Post by rony » Fri, 30 Jul 2004 19:44:22


ob:

You can manually guard multiple threads from each other. What you need to do (and what you have done partially already)
is the following:

- tell Object Rexx that your methods which run concurrently should be UNGUARDED,
- use the keyword statements GUARD ON and GUARD OFF to block/activate the threads yourself,
- use the ability to access attributes in the GUARD-statement and test them for a change in state (that's actually done
by Rexx, which is another very neat feature).

Here's a short multithreaded Object Rexx program with Object Rexx' default support (making it really easy to code it):

- there are two classes
- "class X" can put items into a buffer (method "testWrite")
and take items from the buffer (method "testRead")

- multithreading of the methods is invoked with the REPLY-statement, returning
control to the main program, yet the rest of the method executes concurrently

- "class FIFO" allows storing received items in an Object Rexx queue (method "write")
and remove and return items from the queue (method "read") in a "first-in-first-out"
fashion, hence its name

- all methods of an instance of this class are executed non-concurrently, the
Object Rexx runtime guarantees it

- the main program

- creates three instances of class "X", two to write to a buffer (objects named "a"
and "b") and one to read from a buffer (object named "c")

- creates one instance of the class "FIFO", which is shared among "a", "b", "c", ie.
they all will access the same buffer concurrently, this object is named "fifo"

- the .Local environment is used to store the number of the desired write-repetitions
(used in the method "testwrite" of class "X")

- then "a" is used to write string objects "from_a" to the "fifo" buffer-object, this
method runs concurrently with the main program due to the REPLY-statement in that
method

the same is done via object "b", this time the string "FROM_B" is to be used; again
this is executed concurrently to "a" and the main program!

- then "c" is used to read the items from the fifo-buffer and display them using the
method "testread" which is executed concurrently because of the "REPLY" statement
with "a"'s testwrite, "b"'s testwrite and the main program!

Sounds complicated, but just run the program.

-------------- cut here ---------------
/* */
a=.x~new
b=.x~new
c=.x~new
fifo=.fifo~new /* FIFO-instance */
.local~repetitions = 500
a~testwrite(fifo, "from_a")
b~testwrite(fifo, "FROM_B")
c~testread(fifo)
say "after testread"

/* a class which allows filling and querying a buffer */
::class X

::method testwrite
use arg fifo, msg1
REPLY
do i=1 to .repetitions
fifo~write(msg1 i)
end

::method testread
use arg fifo
REPLY
do while fifo~items > 0
i=fifo~read
say i
end

/* FIFO-buffer, realized by using Object Rexx class .Queue */
/* concurrent access is serialized by Object Rexx */
::class FIFO

::method init
expose buffer
buffer=.queue~new

::method write
expose buffer
use arg tmp
buffer~queue(tmp)

::method read
expose buffer
return buffer~pull

::method items
expose buffer
return buffer~items
-------------- cut here -----------
 
 
 

An example ( Communicating between Object REXX threads

Post by Bob Star » Sat, 31 Jul 2004 18:22:54

Thanks, Rony, A very nice example, especially of GUARD ON. I pasted them in,
ran them, modified them a bit to conduct experiments. This approach is more
portable than my use of SysRequestMutexSem().

Where I'm struggling is how to get my SQL thread to WAIT on a queue. My
experiments show that the PULL method for objects of the .queue class will
not wait for something to arrive on the queue, they will instead return
.nil.

Seems like I could have a queue_has_data attribute which is initialized to
.false, but is set to .true by the write method, and set to .false after the
read method has reduced buffer~items to zero. The read method could use
GUARD ON WHEN queue_has_data=.true
GUARD OFF
data=buffer~pull
...

I'll experiment in this domain.
Sure seems like it would have been easier if RXQUEUE() was thread-safe,
though.
Regards,
Bob Stark



threads quite easily, such that your particular
one thread is used exclusively to access the

 
 
 

An example ( Communicating between Object REXX threads

Post by rony » Sat, 31 Jul 2004 20:15:29

i Bob,

Bob Stark wrote:

Hmm, just experimented with external queues (they can be created without a Rexx program being active, i.e. they can be
re-used [and you can even have different processes feeding the thread which really interfaces with SQL; in this scenario
you would need to devise some protocol to get results to those processes].

Here's an example where the reader-thread is blocked while waiting on data to be put into the external queue named
"test4bob":

-------------------- cut here ---------------------
.local~bob4sql="bobo4sql" -- define name of external Rexx-queue

call rxQueue "create", .bob4sql -- create an external Rexx-queue
call rxQueue "set", .bob4sql -- set to use it

o=.test4bob~new -- create an instance of class TEST4BOB

infos=.array~of("hi", "how are you doing?", "stop!")

do item over infos -- loop over strings stored in array
call sysSleep random(0,2500)/1000 -- sleep randomly
o~read -- now try to read
call sysSleep random(0,2500)/1000 -- sleep randomly
o~write(item) -- now write item
end

call rxQueue "delete", .bob4sql -- delete external queue (it'll survive this run!)


::class test4bob

::method read unguarded
say "<<< in read " date("s") time()
reply
say linein("queue:") -- read from external queue (and wait, if necessary)

::method write unguarded
say ">>> in write" date("s") time()
reply
parse arg val
call lineout "queue:", val -- write string to external queue
-------------------- cut here ---------------------

To make that example to execute even more randomly you could use this version of the above program (but note, that in
this case you should not delete the external queue at the end of the main program, as it is very likely that at that
time there are still pending Object Rexx threads which need to get access to the external Rexx queue after they wake up!):

-------------------- cut here ---------------------

.local~bob4sql="bobo4sql" -- define name of external Rexx-queue

call rxQueue "create", .bob4sql -- create an external Rexx-queue
call rxQueue "set", .bob4sql -- set to use it

o=.test4bob~new -- create an instance of class TEST4BOB

infos=.array~of("hi", "how are you doing?", "stop!")

do item over infos -- loop over strings stored in array
o~read -- now try to read
o~write(item) -- now write item
end

-- can't delete anymore queue, as it is possible that there are still threads
-- waiting to work on the queue at this time!
-- call rxQueue "delete", .bob4sql -- delete external queue (it'll survive this run!)


::class test4bob

::method read unguarded
sleeptime=random(0,2500)/1000
say "<<< in read " date("s") time() "sleeping:" sleeptime "secs"
reply -- return, but continue processing in this thread
call sysSleep sleeptime -- sleep randomly
-- read from external queue (and wait, if necessary)
say linein("queue:") "[" || date("s") time() || "]"

::method write unguarded
sleeptime=random(0,2500)/1000
say ">>> in write" date("s") time() "queuing: ["arg(1)~string"]" "sleeping:" sleeptime "secs"
reply -- return, but continue processing in this
 
 
 

An example ( Communicating between Object REXX threads

Post by Bob Star » Sat, 31 Jul 2004 23:15:28

Now you are closer to where I started. I devised that protocol;
Requester (client) creates two temporary queues:
1. Contents of variables to be transported to the SQL thread
2. Return data (variables returned by the SQL thread). The requester ends up
waiting on this queue.

Client creates the two queues above, then writes a single line to the
SQL_processing queue. This transaction contains the SQL to be executed, and
the names of the two temporary queues. The SQL processing thread wakes up,
reads the transaction, gets the variables for this transaction from the
incoming data queue, executes the SQL, then queues the results to the return
data queue, followed by an END marker so that the client knows it has
received all the data, then goes back to waiting for the next transaction.

This whole design fell apart because the RXQUEUE function is not
thread-safe. You didn't hit that in your test, because you only used one
queue. You cannot have multiple threads reading and writing different
queues, because they will get each others' data and wake up at the wrong
time.

I did all of this as internal REXX functions, not taking advantage of object
orientation (shame on me!).

Perhaps I should create an SQL transaction object, that would house an array
or relation for the input data, and another array/relation for the results.
I'd need to find an atomic way of passing a reference to this object over to
the SQL thread.

Another random thought is to derive a thread-safe rxqueue class that used
guard on to make certain that no two threads switched the current queue
simultaneously. It would need to have it's own pull, push, and queue
methods, that would be protected by guard.

Regards,
Bob Stark



Rexx program being active, i.e. they can be
which really interfaces with SQL; in this scenario
to be put into the external queue named
 
 
 

An example ( Communicating between Object REXX threads

Post by Bob Star » Sun, 01 Aug 2004 19:01:37

Rony, [and any other onlookers whose eyeballs have yet to glaze over!],

Here is the Queue method from the class I wrote to wrap the rxQueue()
function to make it thread safe. It's not quite working yet - in a
multi-threaded environment, only one thread runs to completion, the others
hang up on ghe GUARD ON clause.

Reading the doc for the GUARD keyword, it mentions that it is the object
variable pool that is being serialized; specifically, any variables
mentioned on an EXPOSE. Because I want to serialize any calls to RxQueue()
from any thread within this process, I used the local object to store the
lock variable:
.Local['xRxQueueLock'] = .true
This is not exposed, which seems a reasonable explanation of why it is not
working.
How would you expose such a variable. Or perhaps, thinking outside of the
box, is there another way to handle this variable? Something at the class
level instead of at the instance variable?

::Method Queue Unguarded
Expose qname haveData
Signal On Syntax Name oRxError
Signal On Novalue Name oRxError
Signal On Halt Name oRxError
Trace Value Debug(self~Class~ID'~Queue method entered for' qName)

Use Arg data
GUARD ON WHEN .Local['xRxQueueLock']=.false /* block until unlocked */
.Local['xRxQueueLock'] = .true /* now set attribute lock to "1" */
Call SysSleep 0.1
GUARD OFF /* allow concurrent execution */
Call Debug 'QUEUE: Guard OFF' Time('L'),
SysQueryProcess('PID')'.'SysQueryProcess('TID')
Call SysSleep 0.1

curQueue = rxQueue('SET', qName)
Queue data
havedata = .true

GUARD ON /* Disallow concurrent execution */
Call SysSleep 0.1
Call Debug 'QUEUE: Guard ON' Time('L'),
SysQueryProcess('PID')'.'SysQueryProcess('TID')
.Local['xRxQueueLock'] = .false
Call Debug self~Class~ID'~Queue method exit for' qName
Return

/*RXCOPY ORXERROR DUP 4 LINES COPIED ON 07-30-04 AT 11:40**************/
/*Start of orxError-[rxCopy this into EACH method]------Version-01.02-*/
orxError: PARSE SOURCE . . _ex_name
IF SOURCELINE() > 0 THEN _srcLine = SOURCELINE(sigl); ELSE _srcline =''
INTERPRET
orxErMsg(CONDITION('O'),sigl,_srcLine,_ex_name,VALUE('rc'),VALUE('ok'))

::ROUTINE DEBUG
/* DEBUG is selectively called by other methods to perform tracing and*/
/* debug messages. This method returns data that can be passed to the*/
/* TRACE VALUE keyword. */
IF 1 THEN SAY ARG(1) /* IF 0 ... to stop trace msgs */
RETURN 'N' /* Return N for normal trace */



to be put into the external queue named
 
 
 

An example ( Communicating between Object REXX threads

Post by rony » Mon, 02 Aug 2004 06:36:28

Bob:

::class BobsClass
::Method Init
expose rxQueueLock -- expose the attribute
rxQueueLock=.false -- initialize it to .false



Expose qname haveData xRxQueueLock -- expose the attributes


GUARD ON WHEN xRxQueueLock=.false -- wait on this attribute to become .false


xRxQueueLock=.true -- now set it to .true


xRxQueueLock=.false -- now reset attribute's value


In all other methods of this class of yours you need to apply the very same strategy (don't forget mentioning
"xRxQueueLock" on their EXPOSE statements).

[The variables in the "object variable pool" are called "attributes" to emphasize that they are readily available in all
methods, given that you establish direct access to them via the EXPOSE statement.]

---

Again, I think your problem can be solved in a *much* easier way (cf. my last posting entitled "Maybe using an
asynchroneous message helps in your case? (Re: An", posted yesterday, i.e. 2004-07-31 00:23:48) as you do not need to
devise your own locking, but merely take exploit what has been already put into Object Rexx and still have concurrent
execution of different parts of your program. [There you can *directly* submit any arguments to the worker method and
get at the result of that particular invocation.]

Hope that helps,

---rony