diff options
Diffstat (limited to 'flow/gsl/gslengine.c')
-rw-r--r-- | flow/gsl/gslengine.c | 753 |
1 files changed, 753 insertions, 0 deletions
diff --git a/flow/gsl/gslengine.c b/flow/gsl/gslengine.c new file mode 100644 index 0000000..65ee620 --- /dev/null +++ b/flow/gsl/gslengine.c @@ -0,0 +1,753 @@ +/* GSL Engine - Flow module operation engine + * Copyright (C) 2001 Tim Janik + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ +#include "gslengine.h" + +#include "gslcommon.h" +#include "gslopnode.h" +#include "gslopmaster.h" + + +/* --- prototypes --- */ +static void wakeup_master (void); + + +/* --- UserThread --- */ +GslModule* +gsl_module_new (const GslClass *klass, + gpointer user_data) +{ + EngineNode *node; + guint i; + + g_return_val_if_fail (klass != NULL, NULL); + g_return_val_if_fail (klass->process != NULL || klass->process_defer != NULL, NULL); + if (klass->process_defer) + { + g_warning ("%s: Delay cycle processing not yet implemented", G_STRLOC); + return NULL; + } + + node = gsl_new_struct0 (EngineNode, 1); + + /* setup GslModule */ + node->module.klass = klass; + node->module.user_data = user_data; + node->module.istreams = klass->n_istreams ? gsl_new_struct0 (GslIStream, ENGINE_NODE_N_ISTREAMS (node)) : NULL; + node->module.jstreams = klass->n_jstreams ? gsl_new_struct0 (GslJStream, ENGINE_NODE_N_JSTREAMS (node)) : NULL; + node->module.ostreams = _engine_alloc_ostreams (ENGINE_NODE_N_OSTREAMS (node)); + + /* setup EngineNode */ + node->inputs = ENGINE_NODE_N_ISTREAMS (node) ? gsl_new_struct0 (EngineInput, ENGINE_NODE_N_ISTREAMS (node)) : NULL; + node->jinputs = ENGINE_NODE_N_JSTREAMS (node) ? gsl_new_struct0 (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node)) : NULL; + node->outputs = ENGINE_NODE_N_OSTREAMS (node) ? gsl_new_struct0 (EngineOutput, ENGINE_NODE_N_OSTREAMS (node)) : NULL; + node->output_nodes = NULL; + node->integrated = FALSE; + gsl_rec_mutex_init (&node->rec_mutex); + for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++) + { + node->outputs[i].buffer = node->module.ostreams[i].values; + node->module.ostreams[i].sub_sample_pattern = gsl_engine_sub_sample_test (node->module.ostreams[i].values); + } + node->flow_jobs = NULL; + node->fjob_first = NULL; + node->fjob_last = NULL; + + return &node->module; +} + +/** + * gsl_module_tick_stamp + * @module: a GSL engine module + * @RETURNS: the module's tick stamp, indicating its process status + * + * Any thread may call this function on a valid engine module. + * The module specific tick stamp is updated to gsl_tick_stamp() + + * @n_values every time its GslProcessFunc() function was + * called. See also gsl_tick_stamp(). + */ +guint64 +gsl_module_tick_stamp (GslModule *module) +{ + g_return_val_if_fail (module != NULL, 0); + + return ENGINE_NODE (module)->counter; +} + +/** + * gsl_job_integrate + * @module: The module to integrate + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job to integrate @module into the engine. + */ +GslJob* +gsl_job_integrate (GslModule *module) +{ + GslJob *job; + + g_return_val_if_fail (module != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_INTEGRATE; + job->data.node = ENGINE_NODE (module); + + return job; +} + +/** + * gsl_job_discard + * @module: The module to discard + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which removes @module from the + * engine and destroys it. + */ +GslJob* +gsl_job_discard (GslModule *module) +{ + GslJob *job; + + g_return_val_if_fail (module != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_DISCARD; + job->data.node = ENGINE_NODE (module); + + return job; +} + +/** + * gsl_job_connect + * @src_module: Module with output stream + * @src_ostream: Index of output stream of @src_module + * @dest_module: Module with unconnected input stream + * @dest_istream: Index of input stream of @dest_module + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which connects the output stream @src_ostream + * of module @src_module to the input stream @dest_istream of module @dest_module + * (it is an error if the input stream is already connected by the time the job + * is executed). + */ +GslJob* +gsl_job_connect (GslModule *src_module, + guint src_ostream, + GslModule *dest_module, + guint dest_istream) +{ + GslJob *job; + + g_return_val_if_fail (src_module != NULL, NULL); + g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); + g_return_val_if_fail (dest_module != NULL, NULL); + g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_ICONNECT; + job->data.connection.dest_node = ENGINE_NODE (dest_module); + job->data.connection.dest_ijstream = dest_istream; + job->data.connection.src_node = ENGINE_NODE (src_module); + job->data.connection.src_ostream = src_ostream; + + return job; +} + +GslJob* +gsl_job_jconnect (GslModule *src_module, + guint src_ostream, + GslModule *dest_module, + guint dest_jstream) +{ + GslJob *job; + + g_return_val_if_fail (src_module != NULL, NULL); + g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); + g_return_val_if_fail (dest_module != NULL, NULL); + g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_JCONNECT; + job->data.connection.dest_node = ENGINE_NODE (dest_module); + job->data.connection.dest_ijstream = dest_jstream; + job->data.connection.src_node = ENGINE_NODE (src_module); + job->data.connection.src_ostream = src_ostream; + + return job; +} + +/** + * gsl_job_disconnect + * @dest_module: Module with connected input stream + * @dest_istream: Index of input stream of @dest_module + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which causes the input stream @dest_istream + * of @dest_module to be disconnected (it is an error if the input stream isn't + * connected by the time the job is executed). + */ +GslJob* +gsl_job_disconnect (GslModule *dest_module, + guint dest_istream) +{ + GslJob *job; + + g_return_val_if_fail (dest_module != NULL, NULL); + g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_IDISCONNECT; + job->data.connection.dest_node = ENGINE_NODE (dest_module); + job->data.connection.dest_ijstream = dest_istream; + job->data.connection.src_node = NULL; + job->data.connection.src_ostream = ~0; + + return job; +} + +GslJob* +gsl_job_jdisconnect (GslModule *dest_module, + guint dest_jstream, + GslModule *src_module, + guint src_ostream) +{ + GslJob *job; + + g_return_val_if_fail (dest_module != NULL, NULL); + g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL); + g_return_val_if_fail (src_module != NULL, NULL); + g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_JDISCONNECT; + job->data.connection.dest_node = ENGINE_NODE (dest_module); + job->data.connection.dest_ijstream = dest_jstream; + job->data.connection.src_node = ENGINE_NODE (src_module); + job->data.connection.src_ostream = src_ostream; + + return job; +} + +GslJob* +gsl_job_set_consumer (GslModule *module, + gboolean is_toplevel_consumer) +{ + GslJob *job; + + g_return_val_if_fail (module != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = is_toplevel_consumer ? ENGINE_JOB_SET_CONSUMER : ENGINE_JOB_UNSET_CONSUMER; + job->data.node = ENGINE_NODE (module); + + return job; +} + +/** + * GslAccessFunc + * @module: Module to operate on + * @data: Accessor data + * + * The GslAccessFunc is a user supplied callback function which can access + * a module in times it is not processing. Accessors are usually used to + * either read out a module's current state, or to modify its state. An + * accessor may only operate on the @data and the @module passed + * in to it. + */ +/** + * gsl_job_access + * @module: The module to access + * @access_func: The accessor function + * @data: Data passed in to the accessor + * @free_func: Function to free @data + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which will invoke @access_func + * on @module with @data when the transaction queue is processed + * to modify the module's state. + */ +GslJob* +gsl_job_access (GslModule *module, + GslAccessFunc access_func, + gpointer data, + GslFreeFunc free_func) +{ + GslJob *job; + + g_return_val_if_fail (module != NULL, NULL); + g_return_val_if_fail (access_func != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_ACCESS; + job->data.access.node = ENGINE_NODE (module); + job->data.access.access_func = access_func; + job->data.access.data = data; + job->data.access.free_func = free_func; + + return job; +} + +/** + * gsl_flow_job_access + */ +GslJob* +gsl_flow_job_access (GslModule *module, + guint64 tick_stamp, + GslAccessFunc access_func, + gpointer data, + GslFreeFunc free_func) +{ + GslJob *job; + EngineFlowJob *fjob; + + g_return_val_if_fail (module != NULL, NULL); + g_return_val_if_fail (access_func != NULL, NULL); + + fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAccess, 1); + fjob->fjob_id = ENGINE_FLOW_JOB_ACCESS; + fjob->any.tick_stamp = tick_stamp; + fjob->access.access_func = access_func; + fjob->access.data = data; + fjob->access.free_func = free_func; + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_FLOW_JOB; + job->data.flow_job.node = ENGINE_NODE (module); + job->data.flow_job.fjob = fjob; + + return job; +} + +GslJob* +gsl_flow_job_suspend (GslModule *module, + guint64 tick_stamp) +{ + GslJob *job; + EngineFlowJob *fjob; + + g_return_val_if_fail (module != NULL, NULL); + + fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1); + fjob->fjob_id = ENGINE_FLOW_JOB_SUSPEND; + fjob->any.tick_stamp = tick_stamp; + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_FLOW_JOB; + job->data.flow_job.node = ENGINE_NODE (module); + job->data.flow_job.fjob = fjob; + + return job; +} + +GslJob* +gsl_flow_job_resume (GslModule *module, + guint64 tick_stamp) +{ + GslJob *job; + EngineFlowJob *fjob; + + g_return_val_if_fail (module != NULL, NULL); + + fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1); + fjob->fjob_id = ENGINE_FLOW_JOB_RESUME; + fjob->any.tick_stamp = tick_stamp; + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_FLOW_JOB; + job->data.flow_job.node = ENGINE_NODE (module); + job->data.flow_job.fjob = fjob; + + return job; +} + +/** + * GslPollFunc + * @data: Data of poll function + * @n_values: Minimum number of values the engine wants to process + * @timeout_p: Location of timeout value + * @n_fds: Number of file descriptors used for polling + * @fds: File descriptors to be used for polling + * @revents_filled: Indicates whether @fds actually have their ->revents field filled with valid data. + * @Returns: A boolean value indicating whether the engine should process data right now + * + * The GslPollFunc is a user supplied callback function which can be hooked into the + * GSL engine. The engine uses the poll functions to determine whether processing of + * @n_values in its module network is necessary. + * In order for the poll functions to react to extern events, such as device driver + * status changes, the engine will poll(2) the @fds of the poll function and invoke + * the callback with @revents_filled==%TRUE if any of its @fds changed state. + * The callback may also be invoked at other random times with @revents_filled=%FALSE. + * It is supposed to return %TRUE if network processing is currently necessary, and + * %FALSE if not. + * If %FALSE is returned, @timeout_p may be filled with the number of milliseconds + * the engine should use for polling at maximum. + */ +/** + * gsl_job_add_poll + * @poll_func: Poll function to add + * @data: Data of poll function + * @free_func: Function to free @data + * @n_fds: Number of poll file descriptors + * @fds: File descriptors to select(2) or poll(2) on + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which adds a poll function + * to the engine. The poll function is used by the engine to + * determine whether processing is currently necessary. + */ +GslJob* +gsl_job_add_poll (GslPollFunc poll_func, + gpointer data, + GslFreeFunc free_func, + guint n_fds, + const GPollFD *fds) +{ + GslJob *job; + + g_return_val_if_fail (poll_func != NULL, NULL); + if (n_fds) + g_return_val_if_fail (fds != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_ADD_POLL; + job->data.poll.poll_func = poll_func; + job->data.poll.data = data; + job->data.poll.n_fds = n_fds; + job->data.poll.fds = g_memdup (fds, sizeof (fds[0]) * n_fds); + + return job; +} + +/** + * gsl_job_remove_poll + * @poll_func: Poll function to remove + * @data: Data of poll function + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which removes a previously inserted poll + * function from the engine. + */ +GslJob* +gsl_job_remove_poll (GslPollFunc poll_func, + gpointer data) +{ + GslJob *job; + + g_return_val_if_fail (poll_func != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_REMOVE_POLL; + job->data.poll.poll_func = poll_func; + job->data.poll.data = data; + job->data.poll.free_func = NULL; + job->data.poll.fds = NULL; + + return job; +} + +/** + * gsl_job_debug + * @debug: Debug message + * @Returns: New job suitable for gsl_trans_add() + * + * Create a new transaction job which issues @debug message when + * the job is executed. This function is meant for debugging purposes + * during development phase only and shouldn't be used in production code. + */ +GslJob* +gsl_job_debug (const gchar *debug) +{ + GslJob *job; + + g_return_val_if_fail (debug != NULL, NULL); + + job = gsl_new_struct0 (GslJob, 1); + job->job_id = ENGINE_JOB_DEBUG; + job->data.debug = g_strdup (debug); + + return job; +} + +/** + * gsl_trans_open + * @Returns: Newly opened empty transaction + * + * Open up a new transaction to commit jobs to the GSL engine. + * This function may cause garbage collection (see + * gsl_engine_garbage_collect()). + */ +GslTrans* +gsl_trans_open (void) +{ + GslTrans *trans; + + gsl_engine_garbage_collect (); + + trans = gsl_new_struct0 (GslTrans, 1); + + trans->jobs_head = NULL; + trans->jobs_tail = NULL; + trans->comitted = FALSE; + trans->cqt_next = NULL; + + return trans; +} + +/** + * gsl_trans_add + * @trans: Opened transaction + * @job: Job to add + * + * Append a job to an opened transaction. + */ +void +gsl_trans_add (GslTrans *trans, + GslJob *job) +{ + g_return_if_fail (trans != NULL); + g_return_if_fail (trans->comitted == FALSE); + g_return_if_fail (job != NULL); + g_return_if_fail (job->next == NULL); + + if (trans->jobs_tail) + trans->jobs_tail->next = job; + else + trans->jobs_head = job; + trans->jobs_tail = job; +} + +/** + * gsl_trans_commit + * @trans: Opened transaction + * + * Close the transaction and commit it to the engine. The engine + * will execute the jobs contained in this transaction as soon as + * it has completed its current processing cycle. The jobs will be + * executed in the exact order they were added to the transaction. + */ +void +gsl_trans_commit (GslTrans *trans) +{ + g_return_if_fail (trans != NULL); + g_return_if_fail (trans->comitted == FALSE); + g_return_if_fail (trans->cqt_next == NULL); + + if (trans->jobs_head) + { + trans->comitted = TRUE; + _engine_enqueue_trans (trans); + wakeup_master (); + } + else + gsl_trans_dismiss (trans); +} + +/** + * gsl_trans_dismiss + * @trans: Opened transaction + * + * Close and discard the transaction, destroy all jobs currently + * contained in it and do not execute them. + * This function may cause garbage collection (see + * gsl_engine_garbage_collect()). + */ +void +gsl_trans_dismiss (GslTrans *trans) +{ + g_return_if_fail (trans != NULL); + g_return_if_fail (trans->comitted == FALSE); + g_return_if_fail (trans->cqt_next == NULL); + + _engine_free_trans (trans); + + gsl_engine_garbage_collect (); +} + +/** + * gsl_transact + * @job: First job + * @...: %NULL terminated job list + * + * Convenience function which openes up a new transaction, + * collects the %NULL terminated job list passed to the function, + * and commits the transaction. + */ +void +gsl_transact (GslJob *job, + ...) +{ + GslTrans *trans = gsl_trans_open (); + va_list var_args; + + va_start (var_args, job); + while (job) + { + gsl_trans_add (trans, job); + job = va_arg (var_args, GslJob*); + } + va_end (var_args); + gsl_trans_commit (trans); +} + + +/* --- initialization --- */ +static void +slave (gpointer data) +{ + gboolean run = TRUE; + + while (run) + { + GslTrans *trans = gsl_trans_open (); + gchar *str = g_strdup_printf ("SLAVE(%p): idle", g_thread_self ()); + + gsl_trans_add (trans, gsl_job_debug (str)); + g_free (str); + gsl_trans_add (trans, gsl_job_debug ("string2")); + gsl_trans_commit (trans); + + trans = gsl_trans_open (); + gsl_trans_add (trans, gsl_job_debug ("trans2")); + gsl_trans_commit (trans); + + g_usleep (1000*500); + } +} + +/* --- setup & trigger --- */ +static gboolean gsl_engine_initialized = FALSE; +static gboolean gsl_engine_threaded = FALSE; +static GslThread *master_thread = NULL; +guint gsl_externvar_bsize = 0; +guint gsl_externvar_sample_freq = 0; +guint gsl_externvar_sub_sample_mask = 0; +guint gsl_externvar_sub_sample_steps = 0; + +/** + * gsl_engine_init + * @block_size: number of values to process block wise + * + * Initialize the GSL engine, this function must be called prior to + * any other engine related function and can only be invoked once. + * The @block_size determines the amount by which the global tick + * stamp (see gsl_tick_stamp()) is updated every time the whole + * module network completed processing @block_size values. + */ +void +gsl_engine_init (gboolean run_threaded, + guint block_size, + guint sample_freq, + guint sub_sample_mask) +{ + g_return_if_fail (gsl_engine_initialized == FALSE); + g_return_if_fail (block_size > 0 && block_size <= GSL_STREAM_MAX_VALUES); + g_return_if_fail (sample_freq > 0); + g_return_if_fail (sub_sample_mask < block_size); + g_return_if_fail ((sub_sample_mask & (sub_sample_mask + 1)) == 0); /* power of 2 */ + + gsl_engine_initialized = TRUE; + gsl_engine_threaded = run_threaded; + gsl_externvar_bsize = block_size; + gsl_externvar_sample_freq = sample_freq; + gsl_externvar_sub_sample_mask = sub_sample_mask << 2; /* shift out sizeof (float) alignment */ + gsl_externvar_sub_sample_steps = sub_sample_mask + 1; + _gsl_tick_stamp_set_leap (block_size); + + ENG_DEBUG ("initialization: threaded=%s", gsl_engine_threaded ? "TRUE" : "FALSE"); + + if (gsl_engine_threaded) + { + if (!g_thread_supported ()) g_thread_init (NULL); + master_thread = gsl_thread_new (_engine_master_thread, NULL); + if (0) + gsl_thread_new (slave, NULL); + } +} + +static void +wakeup_master (void) +{ + if (master_thread) + gsl_thread_wakeup (master_thread); +} + +gboolean +gsl_engine_prepare (GslEngineLoop *loop) +{ + g_return_val_if_fail (loop != NULL, FALSE); + g_return_val_if_fail (gsl_engine_initialized == TRUE, FALSE); + + if (!gsl_engine_threaded) + return _engine_master_prepare (loop); + else + { + loop->timeout = -1; + loop->fds_changed = FALSE; + loop->n_fds = 0; + loop->revents_filled = FALSE; + return FALSE; + } +} + +gboolean +gsl_engine_check (const GslEngineLoop *loop) +{ + g_return_val_if_fail (loop != NULL, FALSE); + if (loop->n_fds) + g_return_val_if_fail (loop->revents_filled == TRUE, FALSE); + + if (!gsl_engine_threaded) + return _engine_master_check (loop); + else + return FALSE; +} + +void +gsl_engine_dispatch (void) +{ + g_return_if_fail (gsl_engine_initialized == TRUE); + + if (!gsl_engine_threaded) + _engine_master_dispatch (); +} + +/** + * gsl_engine_wait_on_trans + * + * Wait until all pending transactions have been processed + * by the GSL Engine. + * This function may cause garbage collection (see + * gsl_engine_garbage_collect()). + */ +void +gsl_engine_wait_on_trans (void) +{ + g_return_if_fail (gsl_engine_initialized == TRUE); + + /* non-threaded */ + if (!gsl_engine_threaded) + _engine_master_dispatch_jobs (); + + /* threaded */ + _engine_wait_on_trans (); + + /* call all free() functions */ + gsl_engine_garbage_collect (); +} + +/* vim:set ts=8 sts=2 sw=2: */ |