python c extension: multithreading and random numbers

I have implemented a work queue pattern in C (within a python extension) and I am disappointed with performance.

I have a simulation with a list of particles ("elements"), and I benchmark the time taken to perform all the calculations required for a timestep and record this along with the number of particles involved. I am running the code on a quad-core hyperthreaded i7, so I was expecting for performance to rise (time taken to fall) with the number of threads up to about 8, but instead the fastest implementation has no worker threads (functions are simply executed instead of added to the queue,) and with each worker thread the code gets slower and slower (by a step of more than the time for the unthreaded implementation for each new thread!) I've had a quick peek in my processor usage application, and it seems python never really exceeds 130% CPU usage, regardless of how many threads are running. The machine has plenty of headroom above that, overall system usage at about 200%.

Now part of my queue implementation (shown below) is choosing an item at random from the queue, since each work item's execution requires a lock on two elements and similar elements will be near each other in the queue. Thus, I want the threads to pick random indices and attack different bits of the queue to minimise mutex clashes.

Now, I've read that my initial attempt with rand() will have been slow because my random numbers weren't thread safe (does that sentence make sense? not sure...)

I've tried the implementation both with random() and with drand48_r (although, unfortunately, the latter seems to be unavailable on OS X,) to no avail with the statistics.

Perhaps someone else can tell me what might be the cause of the problem? the code (worker function) is below, and do shout if you think any of the queue_add functions or constructors might be useful to see too.

void* worker_thread_function(void* untyped_queue) {

  queue_t* queue = (queue_t*)untyped_queue;
  int success = 0;
  int rand_id;
  long int temp;
  work_item_t* work_to_do = NULL;
  int work_items_completed = 0;

  while (1) {
    if (pthread_mutex_lock(queue->mutex)) {

      // error case, try again:
      continue;
    }

    while (!success) {

      if (queue->queue->count == 0) {

        pthread_mutex_unlock(queue->mutex);
        break;
      }

      // choose a random item from the work queue, in order to avoid clashing element mutexes.
      rand_id = random() % queue->queue->count;

      if (!pthread_mutex_trylock(((work_item_t*)queue->queue->items[rand_id])->mutex)) {

        // obtain mutex locks on both elements for the work item.
        work_to_do = (work_item_t*)queue->queue->items[rand_id];

        if (!pthread_mutex_trylock(((element_t*)work_to_do->element_1)->mutex)){ 
          if (!pthread_mutex_trylock(((element_t*)work_to_do->element_2)->mutex)) {

            success = 1;
          } else {

            // only locked element_1 and work item:
            pthread_mutex_unlock(((element_t*)work_to_do->element_1)->mutex);
            pthread_mutex_unlock(work_to_do->mutex);
            work_to_do = NULL;
          }
        } else {

          // couldn't lock element_1, didn't even try 2:
          pthread_mutex_unlock(work_to_do->mutex);
          work_to_do = NULL;
        }
      }
    }

    if (work_to_do == NULL) {
       if (queue->queue->count == 0 && queue->exit_flag) {

        break;
      } else {

        continue;
      }
    }

    queue_remove_work_item(queue, rand_id, NULL, 1);
    pthread_mutex_unlock(work_to_do->mutex);

    pthread_mutex_unlock(queue->mutex);

    // At this point, we have mutex locks for the two elements in question, and a
    // work item no longer visible to any other threads. we have also unlocked the main
    // shared queue, and are free to perform the work on the elements.
    execute_function(
      work_to_do->interaction_function,
      (element_t*)work_to_do->element_1,
      (element_t*)work_to_do->element_2,
      (simulation_parameters_t*)work_to_do->params
    );

    // now finished, we should unlock both the elements:
    pthread_mutex_unlock(((element_t*)work_to_do->element_1)->mutex);
    pthread_mutex_unlock(((element_t*)work_to_do->element_2)->mutex);

    // and release the work_item RAM:
    work_item_destroy((void*)work_to_do);
    work_to_do = NULL;

    work_items_completed++;
    success = 0;
  }
  return NULL;
}

Answers


It doesn't seem like random() is your problem, since it is the same code regardless of number of threads. Since performance goes down with number of threads, likely you are getting killed by locking overhead. Do you really need multiple threads? How long does the work function take, and what is your average queue depth? Selecting items randomly seems like a bad idea. Definitely if queue count is <= 2 you don't need to do the rand calculation. Also, instead of randomly selecting queue index, it would be better to just use a different queue per worker thread and insert in a round-robin fashion. Or, at least something simple like remembering the last index claimed by previous thread and just not picking that one.


Python threads aren't real threads. All python threads are run in the same OS level thread, and are executed one-at-a-time thanks to GIL - the Global Interpreter Lock. Rewriting your code with processes might do the trick, if the workers are relatively long-lived in the context.

Wikipedia's page on GIL

----Edit----

Right, this was in c. But GIL still matters. Info on threads in c extensions


To know if this is the bottleneck of your program, you'd have to benchmark and check, but it might well be possible.

random() and friends that have a hidden state variable can be severe bottlenecks for parallel programming. If they are made thread safe, this is usually done by just mutexing the access, so everything slows down.

The portable choice for a thread safe random generator on POSIX systems is erand48. In contrast to drand48 it receives the state variable as an argument. You'd just have to keep a state variable on the stack of each thread (it is an unsigned short[3]) and call erand48 with that.

Also keep in mind that these are pseudo random generators. If you use the same state variable between different threads your random numbers are not independent.


Need Your Help

htmlagilitypack parse table by th

c# html-agility-pack

I am trying to parse the following table using the htmlagilitypack.

How can I fix my NoSuchElementException?

java exception-handling

I've tried many different ways to fix the error. I know it has something to do with the hasNext, I'm just not sure where the error is. Please help. Also if someone could tell me how I can get the v...

About UNIX Resources Network

Original, collect and organize Developers related documents, information and materials, contains jQuery, Html, CSS, MySQL, .NET, ASP.NET, SQL, objective-c, iPhone, Ruby on Rails, C, SQL Server, Ruby, Arrays, Regex, ASP.NET MVC, WPF, XML, Ajax, DataBase, and so on.