1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
|
- """
- bpe is short for Byte Pair Encoder. It translates arbitrary utf-8 strings into
- sequences of integers, where each integer represents small chunks of commonly
- occuring characters. This implementation is based on openai's gpt2 encoder.py:
- https://github.com/openai/gpt-2/blob/master/src/encoder.py
- but was mildly modified because the original implementation is a bit confusing.
- I also tried to add as many comments as possible, my own understanding of what's
- going on.
- """
- import os
- import json
- import regex as re
- import requests
- import torch
- # -----------------------------------------------------------------------------
- def bytes_to_unicode():
- """
- Every possible byte (really an integer 0..255) gets mapped by OpenAI to a unicode
- character that represents it visually. Some bytes have their appearance preserved
- because they don't cause any trouble. These are defined in list bs. For example:
- chr(33) returns "!", so in the returned dictionary we simply have d[33] -> "!".
- However, chr(0), for example, is '\x00', which looks ugly. So OpenAI maps these
- bytes, into new characters in a range where chr() returns a single nice character.
- So in the final dictionary we have d[0] -> 'Ā' instead, which is just chr(0 + 2**8).
- In particular, the space character is 32, which we can see by ord(' '). Instead,
- this function will shift space (32) by 256 to 288, so d[32] -> 'Ġ'.
- So this is just a simple one-to-one mapping of bytes 0..255 into unicode characters
- that "look nice", either in their original form, or a funny shifted character
- like 'Ā', or 'Ġ', etc.
- """
- # the 188 integers that render fine in their original form and need no shifting
- bs = list(range(ord("!"), ord("~")+1))+list(range(ord("¡"), ord("¬")+1))+list(range(ord("®"), ord("ÿ")+1))
- cs = bs[:] # all integers b in bs will simply map to chr(b) in the output dict
- # now get the representations of the other 68 integers that do need shifting
- # each will get mapped chr(256 + n), where n will grow from 0...67 in the loop
- n = 0
- for b in range(2**8):
- if b not in bs:
- # if this byte is "ugly" then map it to the next available "nice" character
- bs.append(b)
- cs.append(2**8+n)
- n += 1
- cs = [chr(n) for n in cs]
- d = dict(zip(bs, cs))
- return d
- def get_pairs(word):
- """
- Return all bigrams as a set of tuples, of consecutive elements in the iterable word.
- """
- pairs = set()
- prev_char = word[0]
- for char in word[1:]:
- pairs.add((prev_char, char))
- prev_char = char
- return pairs
- class Encoder:
- def __init__(self, encoder, bpe_merges):
- # byte encoder/decoder
- self.byte_encoder = bytes_to_unicode()
- self.byte_decoder = {v:k for k, v in self.byte_encoder.items()}
- # bpe token encoder/decoder
- self.encoder = encoder
- self.decoder = {v:k for k,v in self.encoder.items()}
- # bpe merge list that defines the bpe "tree", of tuples (a,b) that are to merge to token ab
- self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
- # the splitting pattern used for pre-tokenization
- # Should haved added re.IGNORECASE so BPE merges can happen for capitalized versions of contractions <-- original openai comment
- """
- ok so what is this regex looking for, exactly?
- python re reference: https://docs.python.org/3/library/re.html
- - the vertical bars | is OR, so re.findall will chunkate text as the pieces match, from left to right
- - '\'s' would split up things like Andrej's -> (Andrej, 's)
- - ' ?\p{L}': optional space followed by 1+ unicode code points in the category "letter"
- - ' ?\p{N}': optional space followed by 1+ unicode code points in the category "number"
- - ' ?[^\s\p{L}\p{N}]+': optional space, then 1+ things that are NOT a whitespace, letter or number
- - '\s+(?!\S)': 1+ whitespace characters (e.g. space or tab or etc) UNLESS they are followed by non-whitespace
- so this will consume whitespace characters in a sequence but exclude the last whitespace in
- that sequence. that last whitespace has the opportunity to then match the optional ' ?' in
- earlier patterns.
- - '\s+': 1+ whitespace characters, intended probably to catch a full trailing sequence of whitespaces at end of string
- So TLDR:
- - we are special casing a few common apostrophe constructs ('s, 't, 're, ...) and making those into separate tokens
- - we then separate out strings into consecutive chunks of 1) letters, 2) numbers, 3) non-letter-numbers, 4) whitespaces
- """
- self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
- self.cache = {}
- def bpe(self, token):
- """
- this function uses self.bpe_ranks to iteratively merge all the possible bpe tokens
- up the tree. token is a string of one individual 'word' (after regex tokenization)
- and after byte encoding, e.g. 'Ġthere'.
- """
- # token is a string of one individual 'word', after byte encoding, e.g. 'Ġthere'
- # memoization, for efficiency
- if token in self.cache:
- return self.cache[token]
- word = tuple(token) # individual characters that make up the token, in a tuple
- pairs = get_pairs(word) # get all bigrams
- if not pairs:
- return token
- while True:
- # find the next lowest rank bigram that can be merged
- bigram = min(pairs, key = lambda pair: self.bpe_ranks.get(pair, float('inf')))
- if bigram not in self.bpe_ranks:
- break # no more bigrams are eligible to be merged
- first, second = bigram
- # we will now replace all occurences of (first, second) in the list of current
- # words into one merged token first_second, in the output list new_words
- new_word = []
- i = 0
- while i < len(word):
- # find the next occurence of first in the sequence of current words
- try:
- j = word.index(first, i)
- new_word.extend(word[i:j])
- i = j
- except:
- new_word.extend(word[i:])
- break
- # if this occurence is also followed by second, then merge them into one
- if word[i] == first and i < len(word)-1 and word[i+1] == second:
- new_word.append(first+second)
- i += 2
- else:
- new_word.append(word[i])
- i += 1
- # all occurences of (first, second) have been merged to first_second
- new_word = tuple(new_word)
- word = new_word
- if len(word) == 1:
- break
- else:
- pairs = get_pairs(word)
- # concat all words into a string, and use ' ' as the separator. Note that
- # by now all characters have been byte encoded, guaranteeing that ' ' is
- # not used in the actual data and is a 'special' delimiter character
- word = ' '.join(word)
- # cache the result and return
- self.cache[token] = word
- return word
- def encode(self, text):
- """ string goes in, list of integers comes out """
- bpe_idx = []
- # pre-tokenize the input text into string tokens (words, roughly speaking)
- tokens = re.findall(self.pat, text)
- # process each token into BPE integers
- for token in tokens:
- # encode the token as a bytes (b'') object
- token_bytes = token.encode('utf-8')
- # translate all bytes to their unicode string representation and flatten
- token_translated = ''.join(self.byte_encoder[b] for b in token_bytes)
- # perform all the applicable bpe merges according to self.bpe_ranks
- token_merged = self.bpe(token_translated).split(' ')
- # translate all bpe tokens to integers
- token_ix = [self.encoder[bpe_token] for bpe_token in token_merged]
- # extend our running list of all output integers
- bpe_idx.extend(token_ix)
- return bpe_idx
- def encode_and_show_work(self, text):
- """ debugging function, same as encode but returns all intermediate work """
- bpe_idx = []
- parts = []
- tokens = re.findall(self.pat, text)
- for token in tokens:
- token_bytes = token.encode('utf-8')
- token_translated = ''.join(self.byte_encoder[b] for b in token_bytes)
- token_merged = self.bpe(token_translated).split(' ')
- token_ix = [self.encoder[bpe_token] for bpe_token in token_merged]
- bpe_idx.extend(token_ix)
- parts.append({
- 'token': token,
- 'token_bytes': token_bytes,
- 'token_translated': token_translated,
- 'token_merged': token_merged,
- 'token_ix': token_ix,
- })
- out = {
- 'bpe_idx': bpe_idx, # the actual output sequence
- 'tokens': tokens, # result of pre-tokenization
- 'parts': parts, # intermediates for each token part
- }
- return out
- def decode(self, bpe_idx):
- """ list of integers comes in, string comes out """
- # inverse map the integers to get the tokens
- tokens_merged = [self.decoder[token] for token in bpe_idx]
- # inverse the byte encoder, e.g. recovering 'Ġ' -> ' ', and get the bytes
- tokens_flat = ''.join(tokens_merged)
- tokens_bytes = bytearray([self.byte_decoder[c] for c in tokens_flat])
- # recover the full utf-8 string
- text = tokens_bytes.decode('utf-8', errors='replace')
- return text
- def get_file(local_file, remote_file):
- """ downloads remote_file to local_file if necessary """
- if not os.path.isfile(local_file):
- print(f"downloading {remote_file} to {local_file}")
- response = requests.get(remote_file)
- open(local_file, "wb").write(response.content)
- def get_encoder():
- """
- Returns an instance of the GPT BPE Encoder/Decoder
- and handles caching of "database" files.
- """
- home_dir = os.path.expanduser('~')
- cache_dir = os.path.join(home_dir, '.cache', 'mingpt')
- os.makedirs(cache_dir, exist_ok=True)
- # load encoder.json that has the raw mappings from token -> bpe index
- encoder_local_file = os.path.join(cache_dir, 'encoder.json')
- encoder_remote_file = 'https://openaipublic.blob.core.windows.net/gpt-2/models/124M/encoder.json'
- get_file(encoder_local_file, encoder_remote_file)
- with open(encoder_local_file, 'r') as f:
- encoder = json.load(f)
- assert len(encoder) == 50257 # 256 individual byte tokens, 50,000 merged tokens, and 1 special <|endoftext|> token
- # load vocab.bpe that contains the bpe merges, i.e. the bpe tree structure
- # in the form tuples (a, b), that indicate that (a, b) is to be merged to one token ab
- vocab_local_file = os.path.join(cache_dir, 'vocab.bpe')
- vocab_remote_file = 'https://openaipublic.blob.core.windows.net/gpt-2/models/124M/vocab.bpe'
- get_file(vocab_local_file, vocab_remote_file)
- with open(vocab_local_file, 'r', encoding="utf-8") as f:
- bpe_data = f.read()
- # light postprocessing: strip the version on first line and the last line is a blank
- bpe_merges = [tuple(merge_str.split()) for merge_str in bpe_data.split('\n')[1:-1]]
- assert len(bpe_merges) == 50000 # 50,000 merged tokens
- # construct the Encoder object and return
- enc = Encoder(encoder, bpe_merges)
- return enc
- # -----------------------------------------------------------------------------
- class BPETokenizer:
- """ PyTorch-aware class that wraps the Encoder above """
- def __init__(self):
- self.encoder = get_encoder()
- def __call__(self, text, return_tensors='pt'):
- # PyTorch only; here because we want to match huggingface/transformers interface
- assert return_tensors == 'pt'
- # single string input for now, in the future potentially a list of strings
- assert isinstance(text, str)
- # encode and create a "batch dimension" of 1
- idx = [self.encoder.encode(text)]
- # wrap into PyTorch tensor
- out = torch.tensor(idx, dtype=torch.long)
- return out
- def decode(self, idx):
- # ensure a simple 1D tensor for now
- assert idx.ndim == 1
- # decode indices to text
- text = self.encoder.decode(idx.tolist())
- return text
- if __name__ == '__main__':
- # here is an encoding example
- text = "Hello!! I'm Andrej Karpathy. It's 2022. w00t :D 🤗"
- e = get_encoder()
- r = e.encode_and_show_work(text)
- print("Original text is:")
- print(text)
- print("First the text gets pre-tokenized, broken up into chunks, the outcome is:")
- print(r['tokens'])
- # ['Hello', '!!', ' I', "'m", ' Andrej', ' Karpathy', '.', ' It', "'s", ' 2022', '.', ' w', '00', 't', ' :', 'D', ' 🤗']
- print("Then we iterate over each chunk and process them in turn...")
- for part in r['parts']:
- print(part)
- # {'token': 'Hello', 'token_bytes': b'Hello', 'token_translated': 'Hello', 'token_merged': ['Hello'], 'token_ix': [15496]}
- # {'token': '!!', 'token_bytes': b'!!', 'token_translated': '!!', 'token_merged': ['!!'], 'token_ix': [3228]}
- # {'token': ' I', 'token_bytes': b' I', 'token_translated': 'ĠI', 'token_merged': ['ĠI'], 'token_ix': [314]}
- # {'token': "'m", 'token_bytes': b"'m", 'token_translated': "'m", 'token_merged': ["'m"], 'token_ix': [1101]}
- # {'token': ' Andrej', 'token_bytes': b' Andrej', 'token_translated': 'ĠAndrej', 'token_merged': ['ĠAndre', 'j'], 'token_ix': [10948, 73]}
- # {'token': ' Karpathy', 'token_bytes': b' Karpathy', 'token_translated': 'ĠKarpathy', 'token_merged': ['ĠK', 'arp', 'athy'], 'token_ix': [509, 5117, 10036]}
- # {'token': '.', 'token_bytes': b'.', 'token_translated': '.', 'token_merged': ['.'], 'token_ix': [13]}
- # {'token': ' It', 'token_bytes': b' It', 'token_translated': 'ĠIt', 'token_merged': ['ĠIt'], 'token_ix': [632]}
- # {'token': "'s", 'token_bytes': b"'s", 'token_translated': "'s", 'token_merged': ["'s"], 'token_ix': [338]}
- # {'token': ' 2022', 'token_bytes': b' 2022', 'token_translated': 'Ġ2022', 'token_merged': ['Ġ2022'], 'token_ix': [33160]}
- # {'token': '.', 'token_bytes': b'.', 'token_translated': '.', 'token_merged': ['.'], 'token_ix': [13]}
- # {'token': ' w', 'token_bytes': b' w', 'token_translated': 'Ġw', 'token_merged': ['Ġw'], 'token_ix': [266]}
- # {'token': '00', 'token_bytes': b'00', 'token_translated': '00', 'token_merged': ['00'], 'token_ix': [405]}
- # {'token': 't', 'token_bytes': b't', 'token_translated': 't', 'token_merged': ['t'], 'token_ix': [83]}
- # {'token': ' :', 'token_bytes': b' :', 'token_translated': 'Ġ:', 'token_merged': ['Ġ:'], 'token_ix': [1058]}
- # {'token': 'D', 'token_bytes': b'D', 'token_translated': 'D', 'token_merged': ['D'], 'token_ix': [35]}
- # {'token': ' 🤗', 'token_bytes': b' \xf0\x9f\xa4\x97', 'token_translated': 'Ġð٤Ĺ', 'token_merged': ['ĠðŁ', '¤', 'Ĺ'], 'token_ix': [12520, 97, 245]}
- # (refer to the code inside Encoder.encode for what these intermediates are)
- print("and the final outcome is concatenating and flattening all the token_ix:")
- print(r['bpe_idx'])
- # [15496, 3228, 314, 1101, 10948, 73, 509, 5117, 10036, 13, 632, 338, 33160, 13, 266, 405, 83, 1058, 35, 12520, 97, 245]
- # this would then become the integer input sequence to the transformer
- print("ready to feed into a Transformer!")
|