Ruby - Threads and Dir[] arrays -
i have built ruby script creates folders, moves file new folder , invokes system()
call trigger ffmpeg. have turned 4 threads can 4 concurrent transcodes @ time.
here example 2 threads (minus folder structure creation , file move functions):
def transcode_process_1 dir["path/file/source/*.mxf"].each |f| random_folder = #code random folder creation file_move = #code move .mxf file random_folder processing system("ffmpeg -i #{random_folder} command 2> /path/file/#{random_filename}.txt") sleep(2) end end def transcode_process_2 sleep(3) dir["path/file/source/*.mxf"].each |f| random_folder = #code random folder creation file_move = #code move .mxf file random_folder processing system("ffmpeg -i #{random_folder} command 2> /path/file/#{random_filename}.txt") sleep(4) end end transcode_thread_1 = thread.new{transcode_process_1()} transcode_thread_2 = thread.new{transcode_process_2()} transcode_thread_1.join transcode_thread_2.join
this iterates through dir "path/file/source/" , processes .mxf files finds. issue having when both threads running adding same files array. results in ffmpeg stating cannot locate file (this because thread has processed , moved temp folder) , creating superfluous folders , log files, making messy.
my question how go making sure transcode_thread_2
not process files transcode_thread_1
has added array? there way can function check file in array still exists? if carry out process, if not move on next file?
thanks
there 2 ways fix this, consider second way superior.
1 - check if file exists before proceeding
simply add file.exist?(f)
check code:
def transcode_process_1 dir["path/file/source/*.mxf"].each |f| next if !file.exist?(f) ..carry on normal..
2 - use queue (part of threads package)
you can use queue, accessible threads. simplifies things in number of ways
- central list of items process
- accessible threads, can spawn many threads can handle
- removes multiple versions of same code , code smell sleep values
something along lines of:
require 'thread' total_threads = 2 # or cores have transcode_queue = queue.new transcode_queue << dir["path/file/source/*.mxf"] def transcode_process(queue) while !queue.empty? file = queue.pop # process file random_folder = '' #code random folder creation random_filename = '' # missing example file_move = '' #code move .mxf file random_folder processing system("ffmpeg -i #{random_folder} command 2> /path/file/#{random_filename}.txt") end end (1..total_threads).each { thread.new { transcode_process(transcode_queue) } }
you don't need sleep()
there should no race condition anymore having accessed queue, , you've shared same code each new thread.
Comments
Post a Comment