multithreading - Haskell - Actor based mutability -


i'm working on haskell network application , use actor pattern manage multithreading. 1 thing came across how store example set of client sockets/handles. of course must accessible threads , can change when clients log on/off.

since i'm coming imperative world thought kind of lock-mechanism when noticed how ugly thought "pure" mutability, it's kind of pure:

import control.concurrent import control.monad import network import system.io import data.list import data.maybe import system.environment import control.exception   newstorage :: (eq a, show a) => io (chan (string, maybe (chan [a]), maybe a)) newstorage =   q <- newchan   forkio $ storage [] q   return q   newhandlestorage :: io (chan (string, maybe (chan [handle]), maybe handle)) newhandlestorage = newstorage   storage :: (eq a, show a) => [a] -> chan (string, maybe (chan [a]), maybe a) -> io () storage s q =   let loop = (`storage` q)   (req, reply, d) <- readchan q   print ("processing " ++ show(d))   case req of     "add" -> loop ((fromjust d) : s)     "remove" -> loop (delete (fromjust d) s)     "get" ->       writechan (fromjust reply) s       loop s   store s d = writechan s ("add", nothing, d) unstore s d = writechan s ("remove", nothing, d) request s =   chan <- newchan   writechan s ("get", chan, nothing)   readchan chan 

the point thread (actor) managing list of items , modifies list according incoming requests. since thread cheap thought nice functional alternative.

of course prototype (a quick dirty proof of concept). question is:

  1. is "good" way of managing shared mutable variables (in actor world) ?
  2. is there library pattern ? (i searched found nothing)

regards, chris

here quick , dirty example using stm , pipes-network. set simple server allows clients connect , increment or decrement counter. display simple status bar showing current tallies of connected clients , remove client tallies bar when disconnect.

first begin server, , i've generously commented code explain how works:

import control.concurrent.stm (stm, atomically) import control.concurrent.stm.tvar import qualified data.hashmap.strict h import data.foldable (form_)  import control.concurrent (forkio, threaddelay) import control.monad (unless) import control.monad.trans.state.strict import qualified data.bytestring.char8 b import control.proxy import control.proxy.tcp import system.io  main =     hsetbuffering stdout nobuffering      {- these internal data structures.  should implementation        detail , should never expose these references        "business logic" part of application. -}     -- use nref keep track of creating fresh ints (which identify users)     nref <- newtvario 0       :: io (tvar int)     {- hmap associates every user (i.e. int) counter         notice how i've "striped" hash map storing stm references        values instead of storing values directly.  means        write hashmap when adding or removing users, reduces        contention hash map.         since each user gets own unique stm reference counter,        modifying counters not cause contention other counters or        contention hash map. -}     hmap <- newtvario h.empty :: io (tvar (h.hashmap int (tvar int)))      {- following code makes heavy use of haskell's pure closures.  each        'let' binding closes on current environment, safe since         haskell pure. -}      let {- 'getcounters' server-facing command in our stm api.             permitted operation retrieving current set of user            counters.             'getcounters' closes on 'hmap' reference in scope            server never needs aware our internal            implementation. -}         getcounters :: stm [int]         getcounters =             refs <- fmap h.elems (readtvar hmap)             mapm readtvar refs          {- 'init' client-facing command in our stm api.              initializes client's entry in hash map , returns 2             commands: first command client calls 'increment'             counter , second command client calls log             off , delete             'delete' command.              notice 2 returned commands each close on client's             unique stm reference client never needs aware of how             'init' implemented under hood. -}         init :: stm (stm (), stm ())         init =             n   <- readtvar nref             writetvar nref $! n + 1              ref <- newtvar 0             modifytvar' hmap (h.insert n ref)              let incrementref :: stm ()                 incrementref =                     mref <- fmap (h.lookup n) (readtvar hmap)                     form_ mref $ \ref -> modifytvar' ref (+ 1)                  deleteref :: stm ()                 deleteref = modifytvar' hmap (h.delete n)              return (incrementref, deleteref)      {- actual program logic.  past point uses        approved stm api (i.e. 'getcounters' , 'init').  if wanted        factor above approved stm api separate module enforce        encapsulation boundary, lazy. -}      {- fork thread polls current state of counters , displays        console.  there way implement without polling        gets job done now.         of doing simple tricks reuse same        console line instead of outputting stream of lines.  otherwise        just:         forkio $ forever $            ns <- atomically getcounters            print ns     -}     forkio $ (`evalstatet` 0) $ forever $         del <-         lift $             putstr (replicate del '\b')             putstr (replicate del ' ' )             putstr (replicate del '\b')         ns <- lift $ atomically getcounters         let str = show ns         lift $ putstr str         put $! length str         lift $ threaddelay 10000      {- fork thread each incoming connection, listens client's        commands , translates them 'stm' actions -}     serve hostany "8080" $ \(socket, _) ->         (increment, delete) <- atomically init          {- right now, dumb thing , convert keypresses            increment commands, exception of 'q' key,            quit -}         let handler :: (proxy p) => () -> consumer p char io ()             handler () = runidentityp loop                               loop =                     c <- request ()                     unless (c == 'q') $                         lift $ atomically increment                         loop          {- uses 'pipes' library.  high-level way            say:             * read binary packets socket no bigger 4096 bytes             * first character each packet , discard rest             * handle character using above 'handler' function -}         runproxy $ socketreads 4096 socket >-> mapd b.head >-> handler          {- above pipeline finishes either when socket closes or            'handler' stops looping because received 'q'.  either case means            client done log them out using 'delete'. -}         atomically delete 

next client, opens connections , forwards key presses single packets:

import control.monad import control.proxy import control.proxy.safe import control.proxy.tcp.safe import data.bytestring.char8 (pack) import system.io  main =     hsetbuffering stdin nobuffering     hsetecho      stdin false      {- again, uses 'pipes' library.  says:          * read characters console using 'commands'          * pack them binary format          * send them server running @ 127.0.0.1:8080          finishes looping when user types 'q' or connection         closed whatever reason.     -}     runsafeio $ runproxy $ runeitherk $          try . commands      >-> mapd (\c -> pack [c])      >-> connectwrited nothing "127.0.0.1" "8080"  commands :: (proxy p) => () -> producer p char io () commands () = runidentityp loop       loop =         c <- lift getchar         respond c         unless (c == 'q') loop 

it's pretty simple: commands generates stream of chars, converted bytestrings , sent packets server.

if run server , few clients , have them each type in few keys, server display output list showing how many keys each client typed:

[1,6,4] 

... , if of clients disconnect removed list:

[1,4] 

note pipes component of these examples simplify in upcoming pipes-4.0.0 release, current pipes ecosystem still gets job done is.


Comments

Popular posts from this blog

php - Calling a template part from a post -

Firefox SVG shape not printing when it has stroke -

How to mention the localhost in android -