i'm working on haskell network application , use actor pattern manage multithreading. 1 thing came across how store example set of client sockets/handles. of course must accessible threads , can change when clients log on/off.
since i'm coming imperative world thought kind of lock-mechanism when noticed how ugly thought "pure" mutability, it's kind of pure:
import control.concurrent import control.monad import network import import data.list import data.maybe import system.environment import control.exception newstorage :: (eq a, show a) => io (chan (string, maybe (chan [a]), maybe a)) newstorage = q <- newchan forkio $ storage [] q return q newhandlestorage :: io (chan (string, maybe (chan [handle]), maybe handle)) newhandlestorage = newstorage storage :: (eq a, show a) => [a] -> chan (string, maybe (chan [a]), maybe a) -> io () storage s q = let loop = (`storage` q) (req, reply, d) <- readchan q print ("processing " ++ show(d)) case req of "add" -> loop ((fromjust d) : s) "remove" -> loop (delete (fromjust d) s) "get" -> writechan (fromjust reply) s loop s store s d = writechan s ("add", nothing, d) unstore s d = writechan s ("remove", nothing, d) request s = chan <- newchan writechan s ("get", chan, nothing) readchan chan
the point thread (actor) managing list of items , modifies list according incoming requests. since thread cheap thought nice functional alternative.
of course prototype (a quick dirty proof of concept). question is:
- is "good" way of managing shared mutable variables (in actor world) ?
- is there library pattern ? (i searched found nothing)
regards, chris
here quick , dirty example using stm
, pipes-network
. set simple server allows clients connect , increment or decrement counter. display simple status bar showing current tallies of connected clients , remove client tallies bar when disconnect.
first begin server, , i've generously commented code explain how works:
import control.concurrent.stm (stm, atomically) import control.concurrent.stm.tvar import qualified data.hashmap.strict h import data.foldable (form_) import control.concurrent (forkio, threaddelay) import control.monad (unless) import control.monad.trans.state.strict import qualified data.bytestring.char8 b import control.proxy import control.proxy.tcp import main = hsetbuffering stdout nobuffering {- these internal data structures. should implementation detail , should never expose these references "business logic" part of application. -} -- use nref keep track of creating fresh ints (which identify users) nref <- newtvario 0 :: io (tvar int) {- hmap associates every user (i.e. int) counter notice how i've "striped" hash map storing stm references values instead of storing values directly. means write hashmap when adding or removing users, reduces contention hash map. since each user gets own unique stm reference counter, modifying counters not cause contention other counters or contention hash map. -} hmap <- newtvario h.empty :: io (tvar (h.hashmap int (tvar int))) {- following code makes heavy use of haskell's pure closures. each 'let' binding closes on current environment, safe since haskell pure. -} let {- 'getcounters' server-facing command in our stm api. permitted operation retrieving current set of user counters. 'getcounters' closes on 'hmap' reference in scope server never needs aware our internal implementation. -} getcounters :: stm [int] getcounters = refs <- fmap h.elems (readtvar hmap) mapm readtvar refs {- 'init' client-facing command in our stm api. initializes client's entry in hash map , returns 2 commands: first command client calls 'increment' counter , second command client calls log off , delete 'delete' command. notice 2 returned commands each close on client's unique stm reference client never needs aware of how 'init' implemented under hood. -} init :: stm (stm (), stm ()) init = n <- readtvar nref writetvar nref $! n + 1 ref <- newtvar 0 modifytvar' hmap (h.insert n ref) let incrementref :: stm () incrementref = mref <- fmap (h.lookup n) (readtvar hmap) form_ mref $ \ref -> modifytvar' ref (+ 1) deleteref :: stm () deleteref = modifytvar' hmap (h.delete n) return (incrementref, deleteref) {- actual program logic. past point uses approved stm api (i.e. 'getcounters' , 'init'). if wanted factor above approved stm api separate module enforce encapsulation boundary, lazy. -} {- fork thread polls current state of counters , displays console. there way implement without polling gets job done now. of doing simple tricks reuse same console line instead of outputting stream of lines. otherwise just: forkio $ forever $ ns <- atomically getcounters print ns -} forkio $ (`evalstatet` 0) $ forever $ del <- lift $ putstr (replicate del '\b') putstr (replicate del ' ' ) putstr (replicate del '\b') ns <- lift $ atomically getcounters let str = show ns lift $ putstr str put $! length str lift $ threaddelay 10000 {- fork thread each incoming connection, listens client's commands , translates them 'stm' actions -} serve hostany "8080" $ \(socket, _) -> (increment, delete) <- atomically init {- right now, dumb thing , convert keypresses increment commands, exception of 'q' key, quit -} let handler :: (proxy p) => () -> consumer p char io () handler () = runidentityp loop loop = c <- request () unless (c == 'q') $ lift $ atomically increment loop {- uses 'pipes' library. high-level way say: * read binary packets socket no bigger 4096 bytes * first character each packet , discard rest * handle character using above 'handler' function -} runproxy $ socketreads 4096 socket >-> mapd b.head >-> handler {- above pipeline finishes either when socket closes or 'handler' stops looping because received 'q'. either case means client done log them out using 'delete'. -} atomically delete
next client, opens connections , forwards key presses single packets:
import control.monad import control.proxy import import import data.bytestring.char8 (pack) import main = hsetbuffering stdin nobuffering hsetecho stdin false {- again, uses 'pipes' library. says: * read characters console using 'commands' * pack them binary format * send them server running @ finishes looping when user types 'q' or connection closed whatever reason. -} runsafeio $ runproxy $ runeitherk $ try . commands >-> mapd (\c -> pack [c]) >-> connectwrited nothing "" "8080" commands :: (proxy p) => () -> producer p char io () commands () = runidentityp loop loop = c <- lift getchar respond c unless (c == 'q') loop
it's pretty simple: commands
generates stream of char
s, converted bytestring
s , sent packets server.
if run server , few clients , have them each type in few keys, server display output list showing how many keys each client typed:
... , if of clients disconnect removed list:
note pipes
component of these examples simplify in upcoming pipes-4.0.0
release, current pipes
ecosystem still gets job done is.
