Workers and Concurrency in GoloScript
Complete guide to concurrent programming with workers, channels, and shared state in GoloScript.
Introduction
GoloScript offers a Go-inspired concurrency model with:
- Workers: parallel tasks launched with
spawnand synchronized withawait - Channels: type-safe communication between workers
- SharedState: sharing mutable data with automatic synchronization
- Mutex: low-level control for advanced cases
Key Concepts
# Launch a parallel worker
let future = spawn myFunction(arg1, arg2)
# Wait for the result
let result = await future
# Create a channel for communication
let ch = channel() # Unbuffered (blocking)
let ch = channel(10) # Buffered (capacity 10)
# Send and receive
chanSend(ch, value)
let value = chanReceive(ch)
# Thread-safe shared state
let state = sharedState(initialValue)
sharedSet(state, newValue)
let value = sharedGet(state)
Quick Start
The fastest way to understand concurrency in Golo:
import gololang.Errors
function main = |args| {
# 1. Basic parallel workers
let task1 = spawn |x| {
sleep(100)
return x * 2
}(10)
let task2 = spawn |x| {
sleep(50)
return x * 3
}(10)
println("Result 1:", await task1) # 20
println("Result 2:", await task2) # 30
# 2. Communication via channel
let messages = channel(5)
# Sender worker
let sender = spawn |messages| {
chanSend(messages, "Hello")
chanSend(messages, "from")
chanSend(messages, "Golo")
}(messages)
# Receiver worker
let receiver = spawn |messages| {
let msg1 = chanReceive(messages)
let msg2 = chanReceive(messages)
let msg3 = chanReceive(messages)
println("Message:", msg1, msg2, msg3)
}(messages)
await sender
await receiver
# 3. Shared state
let counter = sharedState(0)
let incrementer = spawn |counter| {
var i = 1
while i <= 5 {
let current = sharedGet(counter)
sharedSet(counter, current + 1)
sleep(50)
i = i + 1
}
}(counter)
await incrementer
println("Final counter:", sharedGet(counter)) # 5
}
Workers with spawn/await
Concept
Workers allow you to execute code in parallel. spawn launches a worker and immediately returns a Future, await waits for the result.
Syntax
# Spawn with a named function
let future = spawn myFunction(arg1, arg2)
# Spawn with a closure
let future = spawn |x, y| {
# Code to execute in parallel
return x + y
}(10, 20)
# Wait for the result
let result = await future
Examples
Simple parallel workers
# Worker that prints numbers
function numberWorker = {
var i = 1
while i <= 10 {
println("Number:", i)
sleep(1000)
i = i + 1
}
return "Numbers completed"
}
# Worker that prints letters
function letterWorker = {
let letters = ["a", "b", "c", "d", "e"]
var i = 0
while i < len(letters) {
println("Letter:", letters[i])
sleep(500)
i = i + 1
}
return "Letters completed"
}
function main = |args| {
# Launch both workers in parallel
let numbersTask = spawn numberWorker()
let lettersTask = spawn letterWorker()
# Wait for them to complete
let result1 = await numbersTask
let result2 = await lettersTask
println(result1)
println(result2)
}
Intensive parallel computations
function fibonacci = |n| {
if n <= 1 {
return n
}
return fibonacci(n - 1) + fibonacci(n - 2)
}
function main = |args| {
# Launch multiple computations in parallel
let f1 = spawn fibonacci(35)
let f2 = spawn fibonacci(36)
let f3 = spawn fibonacci(37)
# Retrieve the results
println("fibonacci(35) =", await f1)
println("fibonacci(36) =", await f2)
println("fibonacci(37) =", await f3)
}
Multiple awaits (cached result)
let future = spawn fibonacci(35)
# First await - computes the result
let result1 = await future
# Second await - cached result (instant)
let result2 = await future
# Results are identical
println(result1 == result2) # true
Error Handling
let future = spawn myFunction()
try {
let result = await future
println("Result:", result)
} catch(e) {
println("Error:", e)
}
Channels for Communication
Channel Types
# Unbuffered channel (blocking)
let ch = channel()
# Buffered channel (non-blocking until max capacity)
let ch = channel(10)
Basic Operations
# Send (blocks if channel is unbuffered or full)
chanSend(ch, value)
# Receive (blocks until a value is available)
let value = chanReceive(ch)
# Close a channel
chanClose(ch)
Examples
Simple communication
function main = |args| {
let ch = channel()
# Sender worker
let sender = spawn |ch| {
chanSend(ch, "Hello")
chanSend(ch, "World")
}(ch)
# Receiver worker
let receiver = spawn |ch| {
let msg1 = chanReceive(ch)
let msg2 = chanReceive(ch)
println(msg1, msg2)
}(ch)
await sender
await receiver
}
Buffered channel
# Create a channel with buffer size 3
let ch = channel(3)
# Can send 3 messages without blocking
chanSend(ch, "Message 1")
chanSend(ch, "Message 2")
chanSend(ch, "Message 3")
println("3 messages sent (non-blocking)")
# Receive the messages
println(chanReceive(ch))
println(chanReceive(ch))
println(chanReceive(ch))
Result collector
function main = |args| {
let results = channel(5)
# Launch multiple workers that compute
let worker1 = spawn |results| {
sleep(200)
chanSend(results, "Worker 1: " + str(10 + 5))
return "done"
}(results)
let worker2 = spawn |results| {
sleep(100)
chanSend(results, "Worker 2: " + str(20 * 3))
return "done"
}(results)
let worker3 = spawn |results| {
sleep(300)
chanSend(results, "Worker 3: " + str(100 - 25))
return "done"
}(results)
# Collect results (arrive in order of completion)
println("Results:")
println(chanReceive(results)) # Worker 2 (fastest)
println(chanReceive(results)) # Worker 1
println(chanReceive(results)) # Worker 3
await worker1
await worker2
await worker3
}
Processing pipeline
function main = |args| {
let numbers = channel(5)
let squares = channel(5)
# Stage 1: Number generator
let generator = spawn |numbers| {
var i = 1
while i <= 5 {
chanSend(numbers, i)
i = i + 1
}
chanClose(numbers)
return "done"
}(numbers)
# Stage 2: Square calculator
let squarer = spawn |numbers, squares| {
var i = 1
while i <= 5 {
let n = chanReceive(numbers)
let square = n * n
println(n, "->", square)
chanSend(squares, square)
i = i + 1
}
chanClose(squares)
return "done"
}(numbers, squares)
# Stage 3: Final consumer
let consumer = spawn |squares| {
var sum = 0
var i = 1
while i <= 5 {
let square = chanReceive(squares)
sum = sum + square
i = i + 1
}
println("Sum of squares:", sum)
return "done"
}(squares)
await generator
await squarer
await consumer
}
Workers with bidirectional communication
0
while i < count {
let event = chanReceive(events)
collected = push(collected, event)
println("[Collector] Received:", event)
i = i + 1
}
return collected
}
function main = |args| {
let events = channel(20)
# Launch the workers
let numbersTask = spawn numberWorker(events)
let collectorTask = spawn eventCollector(events, 11)
await numbersTask
let allEvents = await collectorTask
println("Total events:", len(allEvents))
}
Shared State (SharedState)
SharedState API
SharedState provides a high-level API for sharing data between workers in a thread-safe manner.
# Create shared state
let state = sharedState(initialValue)
# Read the value (thread-safe)
let value = sharedGet(state)
# Modify the value (thread-safe)
sharedSet(state, newValue)
Examples
Shared accumulator
function main = |args| {
let total = sharedState(0)
let worker1 = spawn |total| {
sleep(100)
let result = 10 + 5
sharedSet(total, result)
println("[Worker 1] Computed:", result)
return "done"
}(total)
let worker2 = spawn |total| {
sleep(200)
let result = 20 * 3
let current = sharedGet(total)
sharedSet(total, current + result)
println("[Worker 2] Computed:", result, "total =", current + result)
return "done"
}(total)
let worker3 = spawn |total| {
sleep(300)
let result = 100 - 25
let current = sharedGet(total)
sharedSet(total, current + result)
println("[Worker 3] Computed:", result, "total =", current + result)
return "done"
}(total)
await worker1
await worker2
await worker3
let final = sharedGet(total)
println("Final total:", final) # 150
}
Shared list
function main = |args| {
let messages = sharedState(array())
let numberWorker = spawn |messages| {
var i = 1
while i <= 5 {
println("[Numbers]", i)
# Add to the shared list
let current = sharedGet(messages)
let newList = push(current, "Number: " + str(i))
sharedSet(messages, newList)
sleep(300)
i = i + 1
}
return "done"
}(messages)
let letterWorker = spawn |messages| {
let letters = ["a", "b", "c", "d", "e"]
var i = 0
while i < len(letters) {
println("[Letters]", letters[i])
# Add to the shared list
let current = sharedGet(messages)
let newList = push(current, "Letter: " + letters[i])
sharedSet(messages, newList)
sleep(400)
i = i + 1
}
return "done"
}(messages)
await numberWorker
await letterWorker
let allMessages = sharedGet(messages)
println("Messages collected:", len(allMessages))
var i = 0
while i < len(allMessages) {
println(" ", allMessages[i])
i = i + 1
}
}
Worker coordination
function main = |args| {
# Use SharedState to indicate when a worker is ready
let status = sharedState("initializing")
# Worker that prepares
let prepWorker = spawn |status| {
println("[Prep] Preparing...")
sleep(500)
sharedSet(status, "ready")
println("[Prep] Ready!")
return "done"
}(status)
# Worker that waits for preparation
let mainWorker = spawn |status| {
println("[Main] Waiting...")
# Wait for status to be "ready"
let ready = false
while ready == false {
let currentStatus = sharedGet(status)
if currentStatus == "ready" {
ready = true
} else {
sleep(100)
}
}
println("[Main] Starting work...")
sleep(200)
println("[Main] Completed!")
sharedSet(status, "completed")
return "done"
}(status)
await prepWorker
await mainWorker
let finalStatus = sharedGet(status)
println("Final status:", finalStatus)
}
Mutex (low-level)
For advanced cases requiring fine control:
let mut = mutex()
let sharedData = sharedState(0)
let worker = spawn |mut, sharedData| {
# Acquire the mutex
mutexLock(mut)
# Critical section
let current = sharedGet(sharedData)
println("Value:", current)
sharedSet(sharedData, current + 10)
# Release the mutex
mutexUnlock(mut)
return "done"
}(mut, sharedData)
await worker
Note: Prefer SharedState which automatically handles synchronization. Only use mutex() if you need low-level control.
Advanced Patterns
Task queue system
A common pattern: multiple workers consume tasks from a shared queue.
function worker = |id, input, output, count| {
println("[Worker", id, "] Started")
var processed = 0
while processed < count {
let task = chanReceive(input)
println("[Worker", id, "] Processing:", task)
sleep(100) # Simulate processing
chanSend(output, "Result of " + task)
processed = processed + 1
}
println("[Worker", id, "] Completed")
return "done"
}
function main = |args| {
let input = channel(10)
let output = channel(10)
# Send all tasks
var i = 1
while i <= 9 {
chanSend(input, "Task-" + str(i))
i = i + 1
}
# Launch 3 workers (each processes 3 tasks)
let w1 = spawn worker(1, input, output, 3)
let w2 = spawn worker(2, input, output, 3)
let w3 = spawn worker(3, input, output, 3)
# Collect results
var i = 1
while i <= 9 {
let result = chanReceive(output)
println("Received:", result)
i = i + 1
}
await w1
await w2
await w3
}
Fan-out / Fan-in
# Fan-out: One producer, multiple consumers
let tasks = channel(10)
# Producer
let producer = spawn |tasks| {
var i = 1
while i <= 10 {
chanSend(tasks, i)
i = i + 1
}
}(tasks)
# Consumers (fan-out)
let consumer1 = spawn |tasks| { /* process */ }(tasks)
let consumer2 = spawn |tasks| { /* process */ }(tasks)
# Fan-in: Multiple producers, one consumer
let results = channel(10)
let worker1 = spawn |results| { chanSend(results, "A") }(results)
let worker2 = spawn |results| { chanSend(results, "B") }(results)
# Single collector
let collector = spawn |results| {
let all = array()
all = push(all, chanReceive(results))
all = push(all, chanReceive(results))
return all
}(results)
Best Practices
✅ Do
-
Always
awaitthe futures you createlet future = spawn myFunction() # ... await future # Don't forget! -
Use buffered channels to avoid deadlocks
# Better: buffered let ch = channel(10) # Risky: unbuffered can cause deadlock let ch = channel() -
Close channels when you’re done sending
var i = 1 while i <= 10 { chanSend(ch, i) i = i + 1 } chanClose(ch) -
Prefer
SharedStateovermutexfor simplicity# ✅ Simple and safe let state = sharedState(0) sharedSet(state, 42) # ❌ More complex and risky let mut = mutex() mutexLock(mut) # ... mutexUnlock(mut) -
Use
sleep()to control timingsleep(100) # 100ms pause
❌ Don’t
-
Don’t forget
await# ❌ The worker runs but the result is lost spawn myFunction() # ✅ Capture the future and await let future = spawn myFunction() await future -
Don’t share mutable data without synchronization
# ❌ Race condition! let data = array() spawn |data| { data = push(data, 1) }(data) spawn |data| { data = push(data, 2) }(data) # ✅ Use SharedState let data = sharedState(array()) spawn |data| { let current = sharedGet(data) sharedSet(data, push(current, 1)) }(data) -
Don’t create too many workers
# ❌ Too many workers (overhead) var i = 1 while i <= 10000 { spawn myFunction() i = i + 1 } # ✅ Reasonable worker pool let numWorkers = 10 # Distribute work among 10 workers -
Don’t block indefinitely on a channel
# ❌ If nobody sends, blocks forever let msg = chanReceive(ch) # ✅ Use a timeout or check first
Recommended Patterns
| Pattern | Use Case | Tool |
|---|---|---|
| Parallel computations | Process independent data | spawn / await |
| Communication | Exchange messages between workers | channel() |
| Data sharing | Access mutable data | sharedState() |
| Synchronization | Coordinate multiple workers | SharedState + polling |
| Pipeline | Chain transformations | Multiple channels |
| Worker pool | Process a task queue | Channel + fixed workers |
Summary
Concurrency in GoloScript offers:
- Lightweight workers:
spawn/awaitfor parallelism - Type-safe channels: Structured communication between workers
- SharedState: Data sharing with automatic synchronization
- Go-like patterns: Fan-out, fan-in, pipelines, worker pools
Recommendation: Start with spawn/await for simple computations, add channels for communication, and use SharedState when you need to share mutable data.