12.4.1.2 Yielding from foreign predicates
Starting with SWI-Prolog 8.5.5 we provide an experimental interface that allows using a SWI-Prolog engine for asynchronous processing. The idea is that an engine that calls a foreign predicate which would need to block may be suspended and later resumed. For example, consider an application that listens to a large number of network connections (sockets). SWI-Prolog offers three scenarios to deal with this:
- Using a thread per connection. This model fits Prolog well as it
allows to keep state in e.g. a DCG using phrase_from_stream/2.
Maintaining an operating system thread per connection uses a significant
amount of resources though.
- Using wait_for_input/3
a single thread can wait for many connections. Each time input arrives
we must associate this with a state engine and advance this
engine using a chunk of input of unknown size. Programming a state
engine in Prolog is typically a tedious job. Although we can use delimited
continuations (see section
4.9) in some scenarios this is not a universal solution.
- Using the primitives from this section we can create an engine (see PL_engine_create()) to handle a connection with the same benefits as using threads. When the engine calls a foreign predicate that would need to block it calls PL_yield_address() to suspend the engine. An overall scheduler watches for ready connections and calls PL_next_solution() to resume the suspended engine. This approach allows processing many connections on the same operating system thread.
As is, these features can only used through the foreign language interface. It was added after discussion with with Mattijs van Otterdijk aiming for using SWI-Prolog together with Rust's asynchronous programming support. Note that this feature is related to the engine API as described in section 11. It uis different though. Where the Prolog engine API allows for communicating with a Prolog engine, the facilities of this section merely allow an engine to suspend, to be resumed later.
To prepare a query for asynchronous usage we first create an engine
using PL_create_engine().
Next, we create a query in the engine using
PL_open_query()
with the flags PL_Q_ALLOW_YIELD
and
PL_Q_EXT_STATUS
. A foreign predicate that needs to be
capable of suspending must be registered using PL_register_foreign()
and the flags
PL_FA_VARARGS
and PL_FA_NONDETERMINISTIC
;
i.e., only non-det predicates can yield. This is no restriction as
non-det predicate can always return TRUE
to indicate
deterministic success. Finally, PL_yield_address()
allows the predicate to yield control, preparing to resume similar to PL_retry_address()
does for non-deterministic results. PL_next_solution()
returns PL_S_YIELD
if a predicate calls PL_yield_address()
and may be resumed by calling
PL_next_solution()
using the same query id (qid). We illustrate the above using
some example fragments.
First, let us create a predicate that can read the available input
from a Prolog stream and yield if it would block. Note that our
predicate
must the PL_FA_VARARGS
interface, which implies
the first argument is in a0, the second in a0+1
,
etc.215the other foreign
interfaces do not support the yield API.
/** read_or_block(+Stream, -String) is det. */ #define BUFSIZE 4096 static foreign_t read_or_block(term_t a0, int arity, void *context) { IOSTREAM *s; switch(PL_foreign_control(context)) { case PL_FIRST_CALL: if ( PL_get_stream(a0, &s, SIO_INPUT) ) { Sset_timeout(s, 0); break; } return FALSE; case PL_RESUME: s = PL_foreign_context_address(context); break; case PL_PRUNED: return PL_release_stream(s); default: assert(0); return FALSE; } char buf[BUFSIZE]; size_t n = Sfread(buf, sizeof buf[0], sizeof buf / sizeof buf[0], s); if ( n == 0 ) // timeout or error { if ( (s->flags&SIO_TIMEOUT) ) PL_yield_address(s); // timeout: yield else return PL_release_stream(s); // raise error } else { return ( PL_release_stream(s) && PL_unify_chars(a0+1, PL_STRING|REP_ISO_LATIN_1, n, buf) ); } }
This function must be registered using PL_register_foreign():
PL_register_foreign("read_or_block", 2, read_or_block, PL_FA_VARARGS|PL_FA_NONDETERMINISTIC);
Next, create an engine to run handle_connection/1 on a Prolog stream. Note that we omitted most of the error checking for readability. Also note that we must make our engine current using PL_set_engine() before we can interact with it.
qid_t start_connection(IOSTREAM *c) { predicate_t p = PL_predicate("handle_connection", 1, "user"); PL_engine_t e = PL_create_engine(NULL); PL_engine_t old; if ( PL_set_engine(e, &old) ) { term_t av = PL_new_term_refs(1); PL_unify_stream(av+0, c); qid_t q = PL_open_query(e, NULL, PL_Q_CATCH_EXCEPTION| PL_Q_ALLOW_YIELD| PL_Q_EXT_STATUS, p, av); PL_set_engine(old, NULL); return q; } /* else error */ }
Finally, our foreign code must manage this engine. Normally it will do so together with many other engines. First, we write a function that runs a query in the engine to which it belongs.216Possibly, future versions of PL_next_solution() may do that although the value is in general limited because interacting with the arguments of the query requires the query's engine to be current anyway.
int PL_engine_next_solution(qid_t qid) { PL_engine_t old; int rc; if ( PL_set_engine(PL_query_engine(qid), &old) == PL_ENGINE_SET ) { rc = PL_next_solution(qid); PL_set_engine(old, NULL); } else rc = FALSE; return rc; }
Now we can simply handle a connection using the loop below which restarts the query as long as it yields. Realistic code manages multiple queries and will (in this case) use the POSIX poll() or select() interfaces to activate the next query that can continue without blocking.
int rc; do { rc = PL_engine_next_solution(qid); } while( rc == PL_S_YIELD );
After the query completes it must be closed using PL_close_query() or PL_cut_query(). The engine may be destroyed using PL_engine_destroy() or reused for a new query.
- (return) foreign_t PL_yield_address(void *)
- Cause PL_next_solution()
of the active query to return with
PL_S_YIELD
. A subsequent call to PL_next_solution() on the same query calls the foreign predicate again with the control status set toPL_RESUME
, after which PL_foreign_context_address() retrieves the address passed to this function. The state of the Prolog engine is maintained, includingterm_t
handles. If the passed address needs to be invalidated the predicate must do so when returning eitherTRUE
orFALSE
. If the engine terminates the predicate the predicate is called with statusPL_PRUNED
, in which case the predicate must cleanup. - int PL_can_yield(void)
- Returns
TRUE
when called from inside a foreign predicate if the query that (indirectly) calls this foreign predicate can yield using PL_yield_address(). ReturnsFALSE
when either there is no current query or the query cannot yield.
Discussion
Asynchronous processing has become popular with modern programming languages, especially those aiming at network communication. Asynchronous processing uses fewer resources than threads while avoiding most of the complications associated with thread synchronization if only a single thread is used to manage the various states. The lack of good support for destructive state updates in Prolog makes it attractive to use threads for dealing with multiple inputs. The fact that Prolog discourages using shared global data such as dynamic predicates typically makes multithreaded code easy to manage.
It is not clear how much scalability we gain using Prolog engines instead of Prolog threads. The only difference between the two is the operating system task. Prolog engines are still rather memory intensive, mainly depending on the stack sizes. Global garbage collection (atoms and clauses) need to process all the stacks of all the engines and thus limit scalability.
One possible future direction is to allow all (possibly) blocking Prolog predicates to use the yield facility and provide a Prolog API to manage sets of engines that use this type of yielding. As is, these features are designed to allow SWI-Prolog for cooperating with languages that provide asynchronous functions.