自然语言处理 |如何使用 Execnet 和 Redis 对单词进行评分
可以同时使用 Redis 和 Execnet 执行分布式单词评分。对于movie_reviews语料库中的每个单词,使用FreqDist和ConditionalFreqDist计算信息增益。
使用 >RedisHashFreqDist 和RedisConditionalHashFreqDist ,可以使用 Redis 执行相同的操作。然后将分数存储在RedisOrderedDict中。为了从 Redis 中获得更好的性能,使用 Execnet 来分配计数。
Redis 和 execnet 用于一起做分布式单词评分。使用 FreqDist 和 ConditionalFreqDist 计算每个单词的信息增益。现在使用 Redis,可以使用RedisHashFreqDist和RedisConditionalHashFreqDist执行相同的操作,然后将分数存储在RedisOrderedDict中。 Execnet 可用于分配计数,以便从 Redis 中获得更好的性能。安装 Redis 和 execnet 后, redis-server的实例必须在 localhost 上运行。
脚步:
- 对于 movie_reviews 语料库中的每个标签(只有 pos 和 neg 标签),首先获取一个元组列表——标签和单词。
- 然后从 dist_featx 模块中,使用 score_words() 获取 word_scores。
- 单词总数为 39、764,word_scores函数是RedisOrderedDict的一个实例。
- 然后获取前 1, 000 个单词并使用 keys() 方法检查前 5 个单词以查看它们是什么。
- 从 word_scores 获得所有需要的信息后删除 Redis 中的键,因为不再需要数据。
代码 :
Python3
# importing libraries
from dist_featx import score_words
from nltk.corpus import movie_reviews
# finding category via categoreies
category = movie_reviews.categories()
print ("Categories : ", category)
category_words = [
(l, movie_reviews.words(categories = [l]))
for l in category]
# Scores
word_scores = score_words(category_words)
print ("Length : ", len(word_scores))
# top words
topn_words = word_scores.keys(end = 1000)
print ("Top Words : ", topn_words[0:5])
# Delete the keys in Redis after getting
# all the required from word_scores
from redis import Redis
r = Redis()
print ([r.delete(key) for
key in ['word_fd', 'label_word_fd:neg',
'label_word_fd:pos', 'word_scores']] )
Python3
# Importing library
import itertools, execnet, remote_word_count
from nltk.metrics import BigramAssocMeasures
from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist
from rediscollections import RedisOrderedDict
# Scoring the words
def score_words(category_words,
score_fn = BigramAssocMeasures.chi_sq,
host ='localhost', specs =[('popen', 2)]):
gateways = []
channels = []
# counting
for spec, count in specs:
for i in range(count):
gw = execnet.makegateway(spec)
gateways.append(gw)
channel = gw.remote_exec(remote_word_count)
channel.send((host, 'word_fd', 'category_word_fd'))
channels.append(channel)
cyc = itertools.cycle(channels)
# syncing the channel
for category, words in category_words:
channel = next(cyc)
channel.send((category, list(words)))
for channel in channels:
channel.send('done')
assert 'done' == channel.receive()
channel.waitclose(5)
for gateway in gateways:
gateway.exit()
r = Redis(host)
# frequency distribution
fd = RedisHashFreqDist(r, 'word_fd')
cfd = RedisConditionalHashFreqDist(r, 'category_word_fd')
word_scores = RedisOrderedDict(r, 'word_scores')
n_xx = cfd.N()
for category in cfd.conditions():
n_xi = cfd[category].N()
for word, n_ii in cfd[category].iteritems():
word = word.decode()
n_ix = fd[word]
if n_ii and n_ix and n_xi and n_xx:
score = score_fn(n_ii, (n_ix, n_xi), n_xx)
word_scores[word] = score
# final word scores
return word_scores
输出 :
Categories : ['neg', 'pos']
Length : 39767
Top Words : [b'bad', b', ', b'and', b'?', b'movie']
[1, 1, 1, 1]
score_words()是 dist_featx 的一个函数。但预计会等待一段时间,因为它需要一些时间才能完成。使用 execnet 和 Redis 的开销意味着它比函数的非分布式内存版本花费的时间要长得多。
这个怎么运作?
dist_featx.py模块包含 score_words()函数,它执行以下操作:
- 打开网关和通道。
- 将初始化数据发送到每个通道。
- 为了计数,它通过一个通道发送每个(标签,单词)元组。
- 向每个通道发送完成消息。
- 等待完成回复。
- 关闭通道和网关。
- 根据计数计算每个单词的分数。
- 将分数存储在 RedisOrderedDict 中。
计数完成后,对所有单词进行评分并存储结果。代码如下:
代码 :
Python3
# Importing library
import itertools, execnet, remote_word_count
from nltk.metrics import BigramAssocMeasures
from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist
from rediscollections import RedisOrderedDict
# Scoring the words
def score_words(category_words,
score_fn = BigramAssocMeasures.chi_sq,
host ='localhost', specs =[('popen', 2)]):
gateways = []
channels = []
# counting
for spec, count in specs:
for i in range(count):
gw = execnet.makegateway(spec)
gateways.append(gw)
channel = gw.remote_exec(remote_word_count)
channel.send((host, 'word_fd', 'category_word_fd'))
channels.append(channel)
cyc = itertools.cycle(channels)
# syncing the channel
for category, words in category_words:
channel = next(cyc)
channel.send((category, list(words)))
for channel in channels:
channel.send('done')
assert 'done' == channel.receive()
channel.waitclose(5)
for gateway in gateways:
gateway.exit()
r = Redis(host)
# frequency distribution
fd = RedisHashFreqDist(r, 'word_fd')
cfd = RedisConditionalHashFreqDist(r, 'category_word_fd')
word_scores = RedisOrderedDict(r, 'word_scores')
n_xx = cfd.N()
for category in cfd.conditions():
n_xi = cfd[category].N()
for word, n_ii in cfd[category].iteritems():
word = word.decode()
n_ix = fd[word]
if n_ii and n_ix and n_xi and n_xx:
score = score_fn(n_ii, (n_ix, n_xi), n_xx)
word_scores[word] = score
# final word scores
return word_scores
如果有两个以上的标签,则应使用不同的评分方法。要比较两个标签,评分方法只会是准确的。这些要求将决定您如何存储单词分数。
拥有实例后,可以通过通道接收两种数据 -
- 完成消息:它表示没有更多数据通过通道进入。
回复另一个 done 消息,最后退出循环以关闭通道。 - (label, words) 的 2 元组:它用于迭代以增加RedisHashFreqDist和RedisConditionalHashFreqDist中的计数